Coverage for hledger_lots/fifo.py: 92%

72 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-05-04 22:41 -0300

1import copy 

2import subprocess 

3from datetime import datetime 

4from typing import List 

5 

6from . import checks 

7from .lib import AdjustedTxn, CostMethodError, adjust_commodity, get_avg_fifo, get_xirr 

8 

9 

10def check_sell(sell: AdjustedTxn, previous_buys: List[AdjustedTxn], check: bool): 

11 if not check: 

12 return 

13 

14 diff_zero = [ 

15 previous_buy for previous_buy in previous_buys if previous_buy.qtty != 0 

16 ] 

17 if len(diff_zero) == 0: 

18 return 

19 

20 previous_buy = diff_zero[0] 

21 if sell.price != previous_buy.price or sell.base_cur != previous_buy.base_cur: 

22 raise CostMethodError(sell, previous_buy.price, previous_buy.base_cur) 

23 

24 

25def get_lots(txns: List[AdjustedTxn], check: bool) -> List[AdjustedTxn]: 

26 local_txns = copy.deepcopy(txns) 

27 checks.check_base_currency(txns) 

28 

29 buys = [txn for txn in local_txns if txn.qtty >= 0] 

30 sells = [txn for txn in local_txns if txn.qtty < 0] 

31 

32 buys_lot: List[AdjustedTxn] = buys if len(sells) == 0 else [] 

33 for sell in sells: 

34 previous_buys = [txn for txn in buys if txn.date <= sell.date] 

35 checks.check_short_sell_past(previous_buys, sell) 

36 later_buys = [txn for txn in buys if txn.date > sell.date] 

37 sell_qtty = abs(sell.qtty) 

38 

39 i = 0 

40 while i < len(previous_buys) and sell_qtty > 0: 

41 previous_buy = previous_buys[i] 

42 check_sell(sell, previous_buys, check) 

43 if sell_qtty >= previous_buy.qtty: 

44 sell_qtty -= previous_buy.qtty 

45 previous_buys[i].qtty = 0 

46 else: 

47 previous_buys[i].qtty -= sell_qtty 

48 sell_qtty = 0 

49 

50 i += 1 

51 

52 buys_lot = [*previous_buys, *later_buys] 

53 

54 return buys_lot 

55 

56 

57def get_sell_lots( 

58 lots: List[AdjustedTxn], sell_date: str, sell_qtty: float, check: bool 

59): 

60 checks.check_short_sell_current(lots, sell_qtty) 

61 buy_lots = get_lots(lots, check) 

62 previous_buys = [lot for lot in buy_lots.copy() if lot.date <= sell_date] 

63 

64 fifo_lots: List[AdjustedTxn] = [] 

65 sell_qtty_curr = sell_qtty 

66 

67 i = 0 

68 while sell_qtty_curr > 0 and i < len(lots): 

69 buy = previous_buys[i] 

70 if buy.qtty == 0: 

71 pass 

72 elif sell_qtty_curr > buy.qtty: 

73 fifo_lots.append( 

74 AdjustedTxn(buy.date, buy.price, buy.base_cur, buy.qtty, buy.acct) 

75 ) 

76 sell_qtty_curr -= buy.qtty 

77 else: 

78 fifo_lots.append( 

79 AdjustedTxn(buy.date, buy.price, buy.base_cur, sell_qtty_curr, buy.acct) 

80 ) 

81 sell_qtty_curr = 0 

82 i += 1 

83 

84 return fifo_lots 

85 

86 

87def txn2hl( 

88 txns: List[AdjustedTxn], 

89 date: str, 

90 cur: str, 

91 cash_account: str, 

92 revenue_account: str, 

93 value: float, 

94): 

95 adj_comm = adjust_commodity(cur) 

96 base_curr = txns[0].base_cur 

97 avg_cost = get_avg_fifo(txns) 

98 sum_qtty = sum(txn.qtty for txn in txns) 

99 price = value / sum_qtty 

100 dt = datetime.strptime(date, "%Y-%m-%d").date() 

101 xirr = get_xirr(price, dt, txns) or 0 * 100 

102 

103 txn_hl = f""" 

104{date} Sold {cur} ; cost_method:fifo 

105 ; commodity:{cur}, qtty:{sum_qtty:,.2f}, price:{price:,.2f} 

106 ; avg_cost:{avg_cost:,.4f}, xirr:{xirr:.2f}% annual percent rate 30/360US 

107 {cash_account} {value:.2f} {base_curr} 

108""" 

109 

110 for txn in txns: 

111 txn_hl += f" {txn.acct} {txn.qtty * -1} {adj_comm} @ {txn.price} {base_curr} ; buy_date:{txn.date}, base_cur:{txn.base_cur}\n" 

112 

113 txn_hl += f" {revenue_account} " 

114 comm = ["hledger", "-f-", "print", "--explicit"] 

115 txn_proc = subprocess.run( 

116 comm, 

117 input=txn_hl.encode(), 

118 capture_output=True, 

119 ) 

120 txn_print: str = txn_proc.stdout.decode("utf8") 

121 return txn_print