beancount.plugins
Example plugins for filtering transactions.
These are various examples of how to filter entries in various creative ways.
IMPORTANT: These are not meant to be complete features, rather just experiments in problem-solving using Beancount, work-in-progress that can be selectively installed via a --plugin option, or one-offs to answer questions on the mailing-list.
beancount.plugins.auto
A plugin of plugins which triggers are all the automatic and lax plugins.
In a sense, this is the inverse of "pedantic." This is useful when doing some types of quick and dirty tests. You can just import the "auto" plugin. Put that in a macro.
Also see: the 'pedantic' plugin.
beancount.plugins.auto_accounts
This module automatically inserts Open directives for accounts not opened (at the date of the first entry) and automatically removes open directives for unused accounts. This can be used as a convenience for doing demos, or when setting up your initial transactions, as an intermediate step.
beancount.plugins.auto_accounts.auto_insert_open(entries, unused_options_map)
Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from postings with costs or with prices from Transaction entries are synthesized as new Price entries in the list of entries output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/auto_accounts.py
def auto_insert_open(entries, unused_options_map):
"""Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from
postings with costs or with prices from Transaction entries are synthesized
as new Price entries in the list of entries output.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Price entries than before, and a
list of errors.
"""
opened_accounts = {entry.account
for entry in entries
if isinstance(entry, data.Open)}
new_entries = []
accounts_first, _ = getters.get_accounts_use_map(entries)
for index, (account, date_first_used) in enumerate(sorted(accounts_first.items())):
if account not in opened_accounts:
meta = data.new_metadata('<auto_accounts>', index)
new_entries.append(data.Open(meta, date_first_used, account,
None, None))
if new_entries:
new_entries.extend(entries)
new_entries.sort(key=data.entry_sortkey)
else:
new_entries = entries
return new_entries, []
beancount.plugins.book_conversions
A plugin that automatically converts postings at price to postings held at cost, applying an automatic booking algorithm in assigning the cost bases and matching lots.
This plugin restricts itself to applying these transformations within a particular account, which you provide. For each of those accounts, it also requires a corresponding Income account to book the profit/loss of reducing lots (i.e., sales):
plugin "beancount.plugins.book_conversions" "Assets:Bitcoin,Income:Bitcoin"
Then, simply input the transactions with price conversion. We use "Bitcoins" in this example, converting Bitcoin purchases that were carried out as currency into maintaining these with cost basis, for tax reporting purposes:
2015-09-04 * "Buy some bitcoins"
Assets:Bank -1000.00 USD
Assets:Bitcoin 4.333507 BTC @ 230.76 USD
2015-09-05 * "Buy some more bitcoins at a different price"
Assets:Bank -1000.00 USD
Assets:Bitcoin 4.345747 BTC @ 230.11 USD
2015-09-20 * "Use (sell) some bitcoins"
Assets:Bitcoin -6.000000 BTC @ 230.50 USD
Expenses:Something
The result is that cost bases are inserted on augmenting lots:
2015-09-04 * "Buy some bitcoins"
Assets:Bitcoin 4.333507 BTC {230.76 USD} @ 230.76 USD
Assets:Bank -1000.00 USD
2015-09-05 * "Buy some more bitcoins at a different price"
Assets:Bitcoin 4.345747 BTC {230.11 USD} @ 230.11 USD
Assets:Bank -1000.00 USD
While on reducing lots, matching FIFO lots are automatically found and the corresponding cost basis added:
2015-09-20 * "Use (sell) some bitcoins"
Assets:Bitcoin -4.333507 BTC {230.76 USD} @ 230.50 USD
Assets:Bitcoin -1.666493 BTC {230.11 USD} @ 230.50 USD
Income:Bitcoin 0.47677955 USD
Expenses:Something 1383.00000000 USD
Note that multiple lots were required to fulfill the sale quantity here. As in this example, this may result in multiple lots being created for a single one.
Finally, Beancount will eventually support booking methods built-in, but this is a quick method that shows how to hack your own booking method via transformations of the postings that run in a plugin.
Implementation notes:
-
This code uses the FIFO method only for now. However, it would be very easy to customize it to provide other booking methods, e.g. LIFO, or otherwise. This will be added eventually, and I'm hoping to reuse the same inventory abstractions that will be used to implement the fallback booking methods from the booking proposal review (http://furius.ca/beancount/doc/proposal-booking).
-
Instead of keeping a list of (Position, Transaction) pairs for the pending FIFO lots, we really ought to use a beancount.core.inventory.Inventory instance. However, the class does not contain sufficient data to carry out FIFO booking at the moment. A newer implementation, living in the "booking" branch, does, and will be used in the future.
-
This code assumes that a positive number of units is an augmenting lot and a reducing one has a negative number of units, though we never call them that way on purpose (to eventually allow this code to handle short positions). This is not strictly true; however, we would need an Inventory in order to figrue this out. This will be done in the future and is not difficult to do.
IMPORTANT:
This plugin was developed before the booking methods (FIFO, LIFO, and others) were fully implemented in Beancount. It was built to answer a question on the mailing-list about FIFO booking. You probably don't need to use them anymore. Always prefer to use the native syntax instead of this.
beancount.plugins.book_conversions.BookConversionError (tuple)
BookConversionError(source, message, entry)
beancount.plugins.book_conversions.BookConversionError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/book_conversions.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.book_conversions.BookConversionError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of BookConversionError(source, message, entry)
beancount.plugins.book_conversions.BookConversionError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/book_conversions.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.book_conversions.ConfigError (tuple)
ConfigError(source, message, entry)
beancount.plugins.book_conversions.ConfigError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/book_conversions.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.book_conversions.ConfigError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ConfigError(source, message, entry)
beancount.plugins.book_conversions.ConfigError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/book_conversions.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.book_conversions.augment_inventory(pending_lots, posting, entry, eindex)
Add the lots from the given posting to the running inventory.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def augment_inventory(pending_lots, posting, entry, eindex):
"""Add the lots from the given posting to the running inventory.
Args:
pending_lots: A list of pending ([number], Posting, Transaction) to be matched.
The number is modified in-place, destructively.
posting: The posting whose position is to be added.
entry: The parent transaction.
eindex: The index of the parent transaction housing this posting.
Returns:
A new posting with cost basis inserted to be added to a transformed transaction.
"""
number = posting.units.number
new_posting = posting._replace(
units=copy.copy(posting.units),
cost=position.Cost(posting.price.number,
posting.price.currency,
entry.date,
None))
pending_lots.append(([number], new_posting, eindex))
return new_posting
beancount.plugins.book_conversions.book_price_conversions(entries, assets_account, income_account)
Rewrite transactions to insert cost basis according to a booking method.
See module docstring for full details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def book_price_conversions(entries, assets_account, income_account):
"""Rewrite transactions to insert cost basis according to a booking method.
See module docstring for full details.
Args:
entries: A list of entry instances.
assets_account: An account string, the name of the account to process.
income_account: An account string, the name of the account to use for booking
realized profit/loss.
Returns:
A tuple of
entries: A list of new, modified entries.
errors: A list of errors generated by this plugin.
matches: A list of (number, augmenting-posting, reducing-postings) for all
matched lots.
"""
# Pairs of (Position, Transaction) instances used to match augmenting
# entries with reducing ones.
pending_lots = []
# A list of pairs of matching (augmenting, reducing) postings.
all_matches = []
new_entries = []
errors = []
for eindex, entry in enumerate(entries):
# Figure out if this transaction has postings in Bitcoins without a cost.
# The purpose of this plugin is to fixup those.
if isinstance(entry, data.Transaction) and any(is_matching(posting, assets_account)
for posting in entry.postings):
# Segregate the reducing lots, augmenting lots and other lots.
augmenting, reducing, other = [], [], []
for pindex, posting in enumerate(entry.postings):
if is_matching(posting, assets_account):
out = augmenting if posting.units.number >= ZERO else reducing
else:
out = other
out.append(posting)
# We will create a replacement list of postings with costs filled
# in, possibly more than the original list, to account for the
# different lots.
new_postings = []
# Convert all the augmenting postings to cost basis.
for posting in augmenting:
new_postings.append(augment_inventory(pending_lots, posting, entry, eindex))
# Then process reducing postings.
if reducing:
# Process all the reducing postings, booking them to matching lots.
pnl = inventory.Inventory()
for posting in reducing:
rpostings, matches, posting_pnl, new_errors = (
reduce_inventory(pending_lots, posting, eindex))
new_postings.extend(rpostings)
all_matches.extend(matches)
errors.extend(new_errors)
pnl.add_amount(amount.Amount(posting_pnl, posting.price.currency))
# If some reducing lots were seen in this transaction, insert an
# Income leg to absorb the P/L. We need to do this for each currency
# which incurred P/L.
if not pnl.is_empty():
for pos in pnl:
meta = data.new_metadata('<book_conversions>', 0)
new_postings.append(
data.Posting(income_account,
-pos.units, None,
None, None, meta))
# Third, add back all the other unrelated legs in.
for posting in other:
new_postings.append(posting)
# Create a replacement entry.
entry = entry._replace(postings=new_postings)
new_entries.append(entry)
# Add matching metadata to all matching postings.
mod_matches = link_entries_with_metadata(new_entries, all_matches)
# Resolve the indexes to their possibly modified Transaction instances.
matches = [(data.TxnPosting(new_entries[aug_index], aug_posting),
data.TxnPosting(new_entries[red_index], red_posting))
for (aug_index, aug_posting), (red_index, red_posting) in mod_matches]
return new_entries, errors, matches
beancount.plugins.book_conversions.book_price_conversions_plugin(entries, options_map, config)
Plugin that rewrites transactions to insert cost basis according to a booking method.
See book_price_conversions() for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def book_price_conversions_plugin(entries, options_map, config):
"""Plugin that rewrites transactions to insert cost basis according to a booking method.
See book_price_conversions() for details.
Args:
entries: A list of entry instances.
options_map: A dict of options parsed from the file.
config: A string, in "<ACCOUNT1>,<ACCOUNT2>" format.
Returns:
A tuple of
entries: A list of new, modified entries.
errors: A list of errors generated by this plugin.
"""
# The expected configuration is two account names, separated by whitespace.
errors = []
try:
assets_account, income_account = re.split(r'[,; \t]', config)
if not account.is_valid(assets_account) or not account.is_valid(income_account):
raise ValueError("Invalid account string")
except ValueError as exc:
errors.append(
ConfigError(
None,
('Invalid configuration: "{}": {}, skipping booking').format(config, exc),
None))
return entries, errors
new_entries, errors, _ = book_price_conversions(entries, assets_account, income_account)
return new_entries, errors
beancount.plugins.book_conversions.extract_trades(entries)
Find all the matching trades from the metadata attached to postings.
This inspects all the postings and pairs them up using the special metadata field that was added by this plugin when booking matching lots, and returns pairs of those postings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def extract_trades(entries):
"""Find all the matching trades from the metadata attached to postings.
This inspects all the postings and pairs them up using the special metadata
field that was added by this plugin when booking matching lots, and returns
pairs of those postings.
Args:
entries: The list of directives to extract from.
Returns:
A list of (number, augmenting-posting, reducing-posting).
"""
trade_map = collections.defaultdict(list)
for index, entry in enumerate(entries):
if not isinstance(entry, data.Transaction):
continue
for posting in entry.postings:
links_str = posting.meta.get(META, None)
if links_str:
links = links_str.split(',')
for link in links:
trade_map[link].append((index, entry, posting))
# Sort matches according to the index of the first entry, drop the index
# used for doing this, and convert the objects to tuples..
trades = [(data.TxnPosting(augmenting[1], augmenting[2]),
data.TxnPosting(reducing[1], reducing[2]))
for augmenting, reducing in sorted(trade_map.values())]
# Sanity check.
for matches in trades:
assert len(matches) == 2
return trades
beancount.plugins.book_conversions.is_matching(posting, account)
"Identify if the given posting is one to be booked.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def is_matching(posting, account):
""""Identify if the given posting is one to be booked.
Args:
posting: An instance of a Posting.
account: The account name configured.
Returns:
A boolean, true if this posting is one that we should be adding a cost to.
"""
return (posting.account == account and
posting.cost is None and
posting.price is not None)
beancount.plugins.book_conversions.link_entries_with_metadata(entries, all_matches)
Modify the entries in-place to add matching links to postings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def link_entries_with_metadata(entries, all_matches):
"""Modify the entries in-place to add matching links to postings.
Args:
entries: The list of entries to modify.
all_matches: A list of pairs of (augmenting-posting, reducing-posting).
Returns:
A list of pairs of (index, Posting) for the new (augmenting, reducing)
annotated postings.
"""
# Allocate trade names and compute a map of posting to trade names.
link_map = collections.defaultdict(list)
for (aug_index, aug_posting), (red_index, red_posting) in all_matches:
link = 'trade-{}'.format(str(uuid.uuid4()).split('-')[-1])
link_map[id(aug_posting)].append(link)
link_map[id(red_posting)].append(link)
# Modify the postings.
postings_repl_map = {}
for entry in entries:
if isinstance(entry, data.Transaction):
for index, posting in enumerate(entry.postings):
links = link_map.pop(id(posting), None)
if links:
new_posting = posting._replace(meta=posting.meta.copy())
new_posting.meta[META] = ','.join(links)
entry.postings[index] = new_posting
postings_repl_map[id(posting)] = new_posting
# Just a sanity check.
assert not link_map, "Internal error: not all matches found."
# Return a list of the modified postings (mapping the old matches to the
# newly created ones).
return [((aug_index, postings_repl_map[id(aug_posting)]),
(red_index, postings_repl_map[id(red_posting)]))
for (aug_index, aug_posting), (red_index, red_posting) in all_matches]
beancount.plugins.book_conversions.main()
Extract trades from metadata-annotated postings and report on them.
Source code in beancount/plugins/book_conversions.py
def main():
"""Extract trades from metadata-annotated postings and report on them.
"""
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s: %(message)s')
parser = version.ArgumentParser(description=__doc__.strip())
parser.add_argument('filename', help='Beancount input filename')
oparser = parser.add_argument_group('Outputs')
oparser.add_argument('-o', '--output', action='store',
help="Filename to output results to (default goes to stdout)")
oparser.add_argument('-f', '--format', default='text',
choices=['text', 'csv'],
help="Output format to render to (text, csv)")
args = parser.parse_args()
# Load the input file.
entries, errors, options_map = loader.load_file(args.filename)
# Get the list of trades.
trades = extract_trades(entries)
# Produce a table of all the trades.
columns = ('units currency cost_currency '
'buy_date buy_price sell_date sell_price pnl').split()
header = ['Units', 'Currency', 'Cost Currency',
'Buy Date', 'Buy Price', 'Sell Date', 'Sell Price',
'P/L']
body = []
for aug, red in trades:
units = -red.posting.units.number
buy_price = aug.posting.price.number
sell_price = red.posting.price.number
pnl = (units * (sell_price - buy_price)).quantize(buy_price)
body.append([
-red.posting.units.number,
red.posting.units.currency,
red.posting.price.currency,
aug.txn.date.isoformat(), buy_price,
red.txn.date.isoformat(), sell_price,
pnl
])
trades_table = table.Table(columns, header, body)
# Render the table as text or CSV.
outfile = open(args.output, 'w') if args.output else sys.stdout
table.render_table(trades_table, outfile, args.format)
beancount.plugins.book_conversions.reduce_inventory(pending_lots, posting, eindex)
Match a reducing posting against a list of lots (using FIFO order).
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/book_conversions.py
def reduce_inventory(pending_lots, posting, eindex):
"""Match a reducing posting against a list of lots (using FIFO order).
Args:
pending_lots: A list of pending ([number], Posting, Transaction) to be matched.
The number is modified in-place, destructively.
posting: The posting whose position is to be added.
eindex: The index of the parent transaction housing this posting.
Returns:
A tuple of
postings: A list of new Posting instances corresponding to the given
posting, that were booked to the current list of lots.
matches: A list of pairs of (augmenting-posting, reducing-posting).
pnl: A Decimal, the P/L incurred in reducing these lots.
errors: A list of new errors generated in reducing these lots.
"""
new_postings = []
matches = []
pnl = ZERO
errors = []
match_number = -posting.units.number
match_currency = posting.units.currency
cost_currency = posting.price.currency
while match_number != ZERO:
# Find the first lot with matching currency.
for fnumber, fposting, findex in pending_lots:
funits = fposting.units
fcost = fposting.cost
if (funits.currency == match_currency and
fcost and fcost.currency == cost_currency):
assert fnumber[0] > ZERO, "Internal error, zero lot"
break
else:
errors.append(
BookConversionError(posting.meta,
"Could not match position {}".format(posting), None))
break
# Reduce the pending lots.
number = min(match_number, fnumber[0])
cost = fcost
match_number -= number
fnumber[0] -= number
if fnumber[0] == ZERO:
pending_lots.pop(0)
# Add a corresponding posting.
rposting = posting._replace(
units=amount.Amount(-number, posting.units.currency),
cost=copy.copy(cost))
new_postings.append(rposting)
# Update the P/L.
pnl += number * (posting.price.number - cost.number)
# Add to the list of matches.
matches.append(((findex, fposting),
(eindex, rposting)))
return new_postings, matches, pnl, errors
beancount.plugins.check_average_cost
A plugin that ensures cost basis is preserved in unbooked transactions.
This is intended to be used in accounts using the "NONE" booking method, to manually ensure that the sum total of the cost basis of reducing legs matches the average of what's in the account inventory. This is a partial first step toward implementing the "AVERAGE" booking method. In other words, this plugins provides assertions that will constrain you to approximate what the "AVERAGE" booking method will do, manually, and not to leak too much cost basis through unmatching bookings without checks. (Note the contrived context here: Ideally the "NONE" booking method would simply not exist.)
beancount.plugins.check_average_cost.MatchBasisError (tuple)
MatchBasisError(source, message, entry)
beancount.plugins.check_average_cost.MatchBasisError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/check_average_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_average_cost.MatchBasisError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of MatchBasisError(source, message, entry)
beancount.plugins.check_average_cost.MatchBasisError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/check_average_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_average_cost.validate_average_cost(entries, options_map, config_str=None)
Check that reducing legs on unbooked postings are near the average cost basis.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_average_cost.py
def validate_average_cost(entries, options_map, config_str=None):
"""Check that reducing legs on unbooked postings are near the average cost basis.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: The configuration as a string version of a float.
Returns:
A list of new errors, if any were found.
"""
# Initialize tolerance bounds.
if config_str and config_str.strip():
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, float):
raise RuntimeError("Invalid configuration for check_average_cost: "
"must be a float")
tolerance = config_obj
else:
tolerance = DEFAULT_TOLERANCE
min_tolerance = D(1 - tolerance)
max_tolerance = D(1 + tolerance)
errors = []
ocmap = getters.get_account_open_close(entries)
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
if isinstance(entry, Transaction):
for posting in entry.postings:
dopen = ocmap.get(posting.account, None)
# Only process accounts with a NONE booking value.
if dopen and dopen[0] and dopen[0].booking == Booking.NONE:
balance = balances[(posting.account,
posting.units.currency,
posting.cost.currency if posting.cost else None)]
if posting.units.number < ZERO:
average = balance.average().get_only_position()
if average is not None:
number = average.cost.number
min_valid = number * min_tolerance
max_valid = number * max_tolerance
if not (min_valid <= posting.cost.number <= max_valid):
errors.append(
MatchBasisError(
entry.meta,
("Cost basis on reducing posting is too far from "
"the average cost ({} vs. {})".format(
posting.cost.number, average.cost.number)),
entry))
balance.add_position(posting)
return entries, errors
beancount.plugins.check_closing
A plugin that automatically inserts a balance check on a tagged closing posting.
Some postings are known to the user to be "closing trades", which means that the resulting position of the instrument just after the trade should be zero. For instance, this is the case for most ordinary options trading, only one lot of a particular instrument is held, and eventually expires or gets sold off in its entirely. One would like to confirm that, and the way to do this in Beancount is to insert a balance check.
This plugin allows you to do that more simply, by inserting metadata. For example, this transaction:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
closing: TRUE
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
Would expand into the following two directives:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
2018-02-17 balance Assets:US:Brokerage:Main:Options 0 QQQ180216C160
Insert the closing line when you know you're closing the position.
beancount.plugins.check_closing.check_closing(entries, options_map)
Expand 'closing' metadata to a zero balance check.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_closing.py
def check_closing(entries, options_map):
"""Expand 'closing' metadata to a zero balance check.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
if posting.meta and posting.meta.get('closing', False):
# Remove the metadata.
meta = posting.meta.copy()
del meta['closing']
entry = entry._replace(meta=meta)
# Insert a balance.
date = entry.date + datetime.timedelta(days=1)
balance = data.Balance(data.new_metadata("<check_closing>", 0),
date, posting.account,
amount.Amount(ZERO, posting.units.currency),
None, None)
new_entries.append(balance)
new_entries.append(entry)
return new_entries, []
beancount.plugins.check_commodity
A plugin that verifies that all seen commodities have a Commodity directive.
This is useful if you're a bit pedantic and like to make sure that you're declared attributes for each of the commodities you use. It's useful if you use the portfolio export, for example.
beancount.plugins.check_commodity.CheckCommodityError (tuple)
CheckCommodityError(source, message, entry)
beancount.plugins.check_commodity.CheckCommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/check_commodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_commodity.CheckCommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CheckCommodityError(source, message, entry)
beancount.plugins.check_commodity.CheckCommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/check_commodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_commodity.validate_commodity_directives(entries, options_map)
Find all commodities used and ensure they have a corresponding Commodity directive.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_commodity.py
def validate_commodity_directives(entries, options_map):
"""Find all commodities used and ensure they have a corresponding Commodity directive.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
commodities_used = options_map['commodities']
errors = []
meta = data.new_metadata('<check_commodity>', 0)
commodity_map = getters.get_commodity_map(entries, create_missing=False)
for currency in commodities_used:
commodity_entry = commodity_map.get(currency, None)
if commodity_entry is None:
errors.append(
CheckCommodityError(
meta,
"Missing Commodity directive for '{}'".format(currency),
None))
return entries, errors
beancount.plugins.coherent_cost
This plugin validates that currencies held at cost aren't ever converted at price and vice-versa. This is usually the case, and using it will prevent users from making the mistake of selling a lot without specifying it via its cost basis.
beancount.plugins.coherent_cost.CoherentCostError (tuple)
CoherentCostError(source, message, entry)
beancount.plugins.coherent_cost.CoherentCostError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/coherent_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.coherent_cost.CoherentCostError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CoherentCostError(source, message, entry)
beancount.plugins.coherent_cost.CoherentCostError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/coherent_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.coherent_cost.validate_coherent_cost(entries, unused_options_map)
Check that all currencies are either used at cost or not at all, but never both.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/coherent_cost.py
def validate_coherent_cost(entries, unused_options_map):
"""Check that all currencies are either used at cost or not at all, but never both.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
with_cost = {}
without_cost = {}
for entry in data.filter_txns(entries):
for posting in entry.postings:
target_set = without_cost if posting.cost is None else with_cost
currency = posting.units.currency
target_set.setdefault(currency, entry)
for currency in set(with_cost) & set(without_cost):
errors.append(
CoherentCostError(
without_cost[currency].meta,
"Currency '{}' is used both with and without cost".format(currency),
with_cost[currency]))
# Note: We really ought to include both of the first transactions here.
return entries, errors
beancount.plugins.commodity_attr
A plugin that asserts that all Commodity directives have a particular attribute and that it is part of a set of enum values.
The configuration must be a mapping of attribute name to list of valid values, like this:
plugin "beancount.plugins.commodity_attr" "{
'sector': ['Technology', 'Financials', 'Energy'],
'name': None,
}"
The plugin issues an error if a Commodity directive is missing the attribute, or if the attribute value is not in the valid set. If you'd like to just ensure the attribute is set, set the list of valid values to None, as in the 'name' attribute in the example above.
beancount.plugins.commodity_attr.CommodityError (tuple)
CommodityError(source, message, entry)
beancount.plugins.commodity_attr.CommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.CommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CommodityError(source, message, entry)
beancount.plugins.commodity_attr.CommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.ConfigError (tuple)
ConfigError(source, message, entry)
beancount.plugins.commodity_attr.ConfigError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.ConfigError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ConfigError(source, message, entry)
beancount.plugins.commodity_attr.ConfigError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.validate_commodity_attr(entries, unused_options_map, config_str)
Check that all Commodity directives have a valid attribute.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/commodity_attr.py
def validate_commodity_attr(entries, unused_options_map, config_str):
"""Check that all Commodity directives have a valid attribute.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: A configuration string.
Returns:
A list of new errors, if any were found.
"""
errors = []
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
errors.append(ConfigError(
data.new_metadata('<commodity_attr>', 0),
"Invalid configuration for commodity_attr plugin; skipping.", None))
return entries, errors
validmap = {attr: frozenset(values) if values is not None else None
for attr, values in config_obj.items()}
for entry in entries:
if not isinstance(entry, data.Commodity):
continue
for attr, values in validmap.items():
value = entry.meta.get(attr, None)
if value is None:
errors.append(CommodityError(
entry.meta,
"Missing attribute '{}' for Commodity directive {}".format(
attr, entry.currency), None))
continue
if values and value not in values:
errors.append(CommodityError(
entry.meta,
"Invalid attribute '{}' for Commodity".format(value) +
" directive {}; valid options: {}".format(
entry.currency, ', '.join(values)), None))
return entries, errors
beancount.plugins.currency_accounts
An implementation of currency accounts.
This is an automatic implementation of the method described here: https://www.mathstat.dal.ca/~selinger/accounting/tutorial.html
You enable it just like this:
plugin "beancount.plugins.currency_accounts" "Equity:CurrencyAccounts"
Accounts will be automatically created under the given base account, with the currency name appended to it, e.g.,
Equity:CurrencyAccounts:CAD
Equity:CurrencyAccounts:USD
etc., where used. You can have a look at the account balances with a query like this:
bean-query $L "select account, sum(position), convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts' "
The sum total of the converted amounts should be a number not too large:
bean-query $L "select convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts'"
WARNING: This is a prototype. Note the FIXMEs in the code below, which indicate some potential problems.
beancount.plugins.currency_accounts.get_neutralizing_postings(curmap, base_account, new_accounts)
Process an entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/currency_accounts.py
def get_neutralizing_postings(curmap, base_account, new_accounts):
"""Process an entry.
Args:
curmap: A dict of currency to a list of Postings of this transaction.
base_account: A string, the root account name to insert.
new_accounts: A set, a mutable accumulator of new account names.
Returns:
A modified entry, with new postings inserted to rebalance currency trading
accounts.
"""
new_postings = []
for currency, postings in curmap.items():
# Compute the per-currency balance.
inv = inventory.Inventory()
for posting in postings:
inv.add_amount(convert.get_cost(posting))
if inv.is_empty():
new_postings.extend(postings)
continue
# Re-insert original postings and remove price conversions.
#
# Note: This may cause problems if the implicit_prices plugin is
# configured to run after this one, or if you need the price annotations
# for some scripting or serious work.
#
# FIXME: We need to handle these important cases (they're not frivolous,
# this is a prototype), probably by inserting some exceptions with
# collaborating code in the booking (e.g. insert some metadata that
# disables price conversions on those postings).
#
# FIXME(2): Ouch! Some of the residual seeps through here, where there
# are more than a single currency block. This needs fixing too. You can
# easily mitigate some of this to some extent, by excluding transactions
# which don't have any price conversion in them.
for pos in postings:
if pos.price is not None:
pos = pos._replace(price=None)
new_postings.append(pos)
# Insert the currency trading accounts postings.
amount = inv.get_only_position().units
acc = account.join(base_account, currency)
new_accounts.add(acc)
new_postings.append(
Posting(acc, -amount, None, None, None, None))
return new_postings
beancount.plugins.currency_accounts.group_postings_by_weight_currency(entry)
Return where this entry might require adjustment.
Source code in beancount/plugins/currency_accounts.py
def group_postings_by_weight_currency(entry: Transaction):
"""Return where this entry might require adjustment."""
curmap = collections.defaultdict(list)
has_price = False
for posting in entry.postings:
currency = posting.units.currency
if posting.cost:
currency = posting.cost.currency
if posting.price:
assert posting.price.currency == currency
elif posting.price:
has_price = True
currency = posting.price.currency
if posting.price:
has_price = True
curmap[currency].append(posting)
return curmap, has_price
beancount.plugins.currency_accounts.insert_currency_trading_postings(entries, options_map, config)
Insert currency trading postings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/currency_accounts.py
def insert_currency_trading_postings(entries, options_map, config):
"""Insert currency trading postings.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The base account name for currency trading accounts.
Returns:
A list of new errors, if any were found.
"""
base_account = config.strip()
if not account.is_valid(base_account):
base_account = DEFAULT_BASE_ACCOUNT
errors = []
new_entries = []
new_accounts = set()
for entry in entries:
if isinstance(entry, Transaction):
curmap, has_price = group_postings_by_weight_currency(entry)
if has_price and len(curmap) > 1:
new_postings = get_neutralizing_postings(
curmap, base_account, new_accounts)
entry = entry._replace(postings=new_postings)
if META_PROCESSED:
entry.meta[META_PROCESSED] = True
new_entries.append(entry)
earliest_date = entries[0].date
open_entries = [
data.Open(data.new_metadata('<currency_accounts>', index),
earliest_date, acc, None, None)
for index, acc in enumerate(sorted(new_accounts))]
return open_entries + new_entries, errors
beancount.plugins.divert_expenses
For tagged transactions, convert expenses to a single account.
This plugin allows you to select a tag and it automatically converts all the Expenses postings to use a single account. For example, with this input:
plugin "divert_expenses" "['kid', 'Expenses:Child']"
2018-01-28 * "CVS" "Formula" #kid
Liabilities:CreditCard -10.27 USD
Expenses:Food:Grocery 10.27 USD
It will output:
2018-01-28 * "CVS" "Formula" #kid Liabilities:CreditCard -10.27 USD Expenses:Child 10.27 USD
You can limit the diversion to one posting only, like this:
2018-05-05 * "CVS/PHARMACY" "" #kai
Liabilities:CreditCard -66.38 USD
Expenses:Pharmacy 21.00 USD ;; Vitamins for Kai
Expenses:Pharmacy 45.38 USD
divert: FALSE
See unit test for details.
See this thread for context: https://docs.google.com/drawings/d/18fTrrGlmz0jFbfcGGHTffbdRwbmST8r9_3O26Dd1Xww/edit?usp=sharing
beancount.plugins.divert_expenses.divert_expenses(entries, options_map, config_str)
Divert expenses.
Explicit price entries are simply maintained in the output list. Prices from postings with costs or with prices from Transaction entries are synthesized as new Price entries in the list of entries output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/divert_expenses.py
def divert_expenses(entries, options_map, config_str):
"""Divert expenses.
Explicit price entries are simply maintained in the output list. Prices from
postings with costs or with prices from Transaction entries are synthesized
as new Price entries in the list of entries output.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
options_map: A parser options dict.
config_str: A configuration string, which is intended to be a list of two strings,
a tag, and an account to replace expenses with.
Returns:
A modified list of entries.
"""
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
raise RuntimeError("Invalid plugin configuration: should be a single dict.")
tag = config_obj['tag']
replacement_account = config_obj['account']
acctypes = options.get_account_types(options_map)
new_entries = []
errors = []
for entry in entries:
if isinstance(entry, Transaction) and tag in entry.tags:
entry = replace_diverted_accounts(entry, replacement_account, acctypes)
new_entries.append(entry)
return new_entries, errors
beancount.plugins.divert_expenses.replace_diverted_accounts(entry, replacement_account, acctypes)
Replace the Expenses accounts from the entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/divert_expenses.py
def replace_diverted_accounts(entry, replacement_account, acctypes):
"""Replace the Expenses accounts from the entry.
Args:
entry: A Transaction directive.
replacement_account: A string, the account to use for replacement.
acctypes: An AccountTypes instance.
Returns:
A possibly entry directive.
"""
new_postings = []
for posting in entry.postings:
divert = posting.meta.get('divert', None) if posting.meta else None
if (divert is True or (
divert is None and
account_types.is_account_type(acctypes.expenses, posting.account))):
posting = posting._replace(account=replacement_account,
meta={'diverted_account': posting.account})
new_postings.append(posting)
return entry._replace(postings=new_postings)
beancount.plugins.exclude_tag
Exclude #virtual tags.
This is used to demonstrate excluding a set of transactions from a particular tag. In this example module, the tag name is fixed, but if we integrated this we could provide a way to choose which tags to exclude. This is simply just another mechanism for selecting a subset of transactions.
See discussion here for details: https://groups.google.com/d/msg/ledger-cli/N8Slh2t45K0/aAz0i3Be4LYJ
beancount.plugins.exclude_tag.exclude_tag(entries, options_map)
Select all transactions that do not have a #virtual tag.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/exclude_tag.py
def exclude_tag(entries, options_map):
"""Select all transactions that do not have a #virtual tag.
Args:
entries: A list of entry instances.
options_map: A dict of options parsed from the file.
Returns:
A tuple of entries and errors.
"""
filtered_entries = [entry
for entry in entries
if (not isinstance(entry, data.Transaction) or
not entry.tags or
EXCLUDED_TAG not in entry.tags)]
return (filtered_entries, [])
beancount.plugins.fill_account
Insert an posting with a default account when there is only a single posting.
This is convenient to use in files which have mostly expenses, such as during a trip. Set the name of the default account to fill in as an option.
beancount.plugins.fill_account.FillAccountError (tuple)
FillAccountError(source, message, entry)
beancount.plugins.fill_account.FillAccountError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/fill_account.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.fill_account.FillAccountError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of FillAccountError(source, message, entry)
beancount.plugins.fill_account.FillAccountError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/fill_account.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.fill_account.fill_account(entries, unused_options_map, insert_account)
Insert an posting with a default account when there is only a single posting.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/fill_account.py
def fill_account(entries, unused_options_map, insert_account):
"""Insert an posting with a default account when there is only a single posting.
Args:
entries: A list of directives.
unused_options_map: A parser options dict.
insert_account: A string, the name of the account.
Returns:
A list of entries, possibly with more Price entries than before, and a
list of errors.
"""
if not account.is_valid(insert_account):
return entries, [
FillAccountError(data.new_metadata('<fill_account>', 0),
"Invalid account name: '{}'".format(insert_account),
None)]
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction) and len(entry.postings) == 1:
inv = inventory.Inventory()
for posting in entry.postings:
if posting.cost is None:
inv.add_amount(posting.units)
else:
inv.add_amount(convert.get_cost(posting))
inv.reduce(convert.get_units)
new_postings = list(entry.postings)
for pos in inv:
new_postings.append(data.Posting(insert_account, -pos.units,
None, None, None, None))
entry = entry._replace(postings=new_postings)
new_entries.append(entry)
return new_entries, []
beancount.plugins.fix_payees
Rename payees based on a set of rules.
This can be used to clean up dirty imported payee names.
This plugin accepts a list of rules in this format:
plugin "beancount.plugins.fix_payees" "[
(PAYEE, MATCH1, MATCH2, ...),
]"
Each of the "MATCH" clauses is a string, in the format:
"A:<regexp>" : Match the account name.
"D:<regexp>" : Match the payee or the narration.
The plugin matches the Transactions in the file and if there is a case-insensitive match against the regular expression (we use re.search()), replaces the payee name by "PAYEE". If multiple rules match, only the first rule is used.
For example:
plugin "beancount.plugins.fix_payees" "[
("T-Mobile USA",
"A:Expenses:Communications:Phone",
"D:t-mobile"),
("Con Edison",
"A:Expenses:Home:Electricity",
"D:con ?ed"),
("Birreria @ Eataly",
"D:EATALY BIRRERIA"),
]"
beancount.plugins.fix_payees.FixPayeesError (tuple)
FixPayeesError(source, message, entry)
beancount.plugins.fix_payees.FixPayeesError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/fix_payees.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.fix_payees.FixPayeesError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of FixPayeesError(source, message, entry)
beancount.plugins.fix_payees.FixPayeesError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/fix_payees.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.fix_payees.fix_payees(entries, options_map, config)
Rename payees based on a set of rules. See module docstring for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/fix_payees.py
def fix_payees(entries, options_map, config):
"""Rename payees based on a set of rules. See module docstring for details.
Args:
entries: a list of entry instances
options_map: a dict of options parsed from the file
config: A configuration string, which is intended to be a list of
(PAYEE, MATCH, ...) rules. See module docstring for details.
Returns:
A tuple of entries and errors.
"""
errors = []
if config.strip():
try:
expr = ast.literal_eval(config)
except (SyntaxError, ValueError):
meta = data.new_metadata(options_map['filename'], 0)
errors.append(FixPayeesError(meta,
"Syntax error in config: {}".format(config),
None))
return entries, errors
else:
return entries, errors
# Pre-compile the regular expressions for performance.
rules = []
for rule in ast.literal_eval(config):
clauses = iter(rule)
new_payee = next(clauses)
regexps = []
for clause in clauses:
match = re.match('([AD]):(.*)', clause)
if not match:
meta = data.new_metadata(options_map['filename'], 0)
errors.append(FixPayeesError(meta,
"Invalid clause: {}".format(clause),
None))
continue
command, regexp = match.groups()
regexps.append((command, re.compile(regexp, re.I).search))
new_rule = [new_payee] + regexps
rules.append(tuple(new_rule))
# Run the rules over the transaction objects.
new_entries = []
replaced_entries = {rule[0]: [] for rule in rules}
for entry in entries:
if isinstance(entry, data.Transaction):
for rule in rules:
clauses = iter(rule)
new_payee = next(clauses)
# Attempt to match all the clauses.
for clause in clauses:
command, func = clause
if command == 'D':
if not ((entry.payee is not None and func(entry.payee)) or
(entry.narration is not None and func(entry.narration))):
break
elif command == 'A':
if not any(func(posting.account) for posting in entry.postings):
break
else:
# Make the replacement.
entry = entry._replace(payee=new_payee)
replaced_entries[new_payee].append(entry)
new_entries.append(entry)
if _DEBUG:
# Print debugging info.
for payee, repl_entries in sorted(replaced_entries.items(),
key=lambda x: len(x[1]),
reverse=True):
print('{:60}: {}'.format(payee, len(repl_entries)))
return new_entries, errors
beancount.plugins.forecast
An example of adding a forecasting feature to Beancount via a plugin.
This entry filter plugin uses existing syntax to define and automatically inserted transactions in the future based on a convention. It serves mostly as an example of how you can experiment by creating and installing a local filter, and not so much as a serious forecasting feature (though the experiment is a good way to get something more general kickstarted eventually, I think the concept would generalize nicely and should eventually be added as a common feature of Beancount).
A user can create a transaction like this:
2014-03-08 # "Electricity bill [MONTHLY]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
and new transactions will be created monthly for the following year. Note the use of the '#' flag and the word 'MONTHLY' which defines the periodicity.
The number of recurrences can optionally be specified either by providing an end date or by specifying the number of times that the transaction will be repeated. For example:
2014-03-08 # "Electricity bill [MONTHLY UNTIL 2019-12-31]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
2014-03-08 # "Electricity bill [MONTHLY REPEAT 10 TIMES]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
Transactions can also be repeated at yearly intervals, e.g.:
2014-03-08 # "Electricity bill [YEARLY REPEAT 10 TIMES]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
Other examples:
2014-03-08 # "Electricity bill [WEEKLY SKIP 1 TIME REPEAT 10 TIMES]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
2014-03-08 # "Electricity bill [DAILY SKIP 3 TIMES REPEAT 1 TIME]"
Expenses:Electricity 50.10 USD
Assets:Checking -50.10 USD
beancount.plugins.forecast.forecast_plugin(entries, options_map)
An example filter that piggybacks on top of the Beancount input syntax to insert forecast entries automatically. This functions accepts the return value of beancount.loader.load_file() and must return the same type of output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/forecast.py
def forecast_plugin(entries, options_map):
"""An example filter that piggybacks on top of the Beancount input syntax to
insert forecast entries automatically. This functions accepts the return
value of beancount.loader.load_file() and must return the same type of output.
Args:
entries: a list of entry instances
options_map: a dict of options parsed from the file
Returns:
A tuple of entries and errors.
"""
# Find the last entry's date.
date_today = entries[-1].date
# Filter out forecast entries from the list of valid entries.
forecast_entries = []
filtered_entries = []
for entry in entries:
outlist = (forecast_entries
if (isinstance(entry, data.Transaction) and entry.flag == '#')
else filtered_entries)
outlist.append(entry)
# Generate forecast entries up to the end of the current year.
new_entries = []
for entry in forecast_entries:
# Parse the periodicity.
match = re.search(r'(^.*)\[(MONTHLY|YEARLY|WEEKLY|DAILY)'
r'(\s+SKIP\s+([1-9][0-9]*)\s+TIME.?)'
r'?(\s+REPEAT\s+([1-9][0-9]*)\s+TIME.?)'
r'?(\s+UNTIL\s+([0-9\-]+))?\]', entry.narration)
if not match:
new_entries.append(entry)
continue
forecast_narration = match.group(1).strip()
forecast_interval = (
rrule.YEARLY if match.group(2).strip() == 'YEARLY'
else rrule.WEEKLY if match.group(2).strip() == 'WEEKLY'
else rrule.DAILY if match.group(2).strip() == 'DAILY'
else rrule.MONTHLY)
forecast_periodicity = {'dtstart': entry.date}
if match.group(6): # e.g., [MONTHLY REPEAT 3 TIMES]:
forecast_periodicity['count'] = int(match.group(6))
elif match.group(8): # e.g., [MONTHLY UNTIL 2020-01-01]:
forecast_periodicity['until'] = datetime.datetime.strptime(
match.group(8), '%Y-%m-%d').date()
else:
# e.g., [MONTHLY]
forecast_periodicity['until'] = datetime.date(
datetime.date.today().year, 12, 31)
if match.group(4):
# SKIP
forecast_periodicity['interval'] = int(match.group(4)) + 1
# Generate a new entry for each forecast date.
forecast_dates = [dt.date() for dt in rrule.rrule(forecast_interval,
**forecast_periodicity)]
for forecast_date in forecast_dates:
forecast_entry = entry._replace(date=forecast_date,
narration=forecast_narration)
new_entries.append(forecast_entry)
# Make sure the new entries inserted are sorted.
new_entries.sort(key=data.entry_sortkey)
return (filtered_entries + new_entries, [])
beancount.plugins.implicit_prices
This plugin synthesizes Price directives for all Postings with a price or directive or if it is an augmenting posting, has a cost directive.
beancount.plugins.implicit_prices.ImplicitPriceError (tuple)
ImplicitPriceError(source, message, entry)
beancount.plugins.implicit_prices.ImplicitPriceError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/implicit_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.implicit_prices.ImplicitPriceError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ImplicitPriceError(source, message, entry)
beancount.plugins.implicit_prices.ImplicitPriceError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/implicit_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.implicit_prices.add_implicit_prices(entries, unused_options_map)
Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from postings with costs or with prices from Transaction entries are synthesized as new Price entries in the list of entries output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/implicit_prices.py
def add_implicit_prices(entries, unused_options_map):
"""Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from
postings with costs or with prices from Transaction entries are synthesized
as new Price entries in the list of entries output.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Price entries than before, and a
list of errors.
"""
new_entries = []
errors = []
# A dict of (date, currency, cost-currency) to price entry.
new_price_entry_map = {}
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
# Always replicate the existing entries.
new_entries.append(entry)
if isinstance(entry, Transaction):
# Inspect all the postings in the transaction.
for posting in entry.postings:
units = posting.units
cost = posting.cost
# Check if the position is matching against an existing
# position.
_, booking = balances[posting.account].add_position(posting)
# Add prices when they're explicitly specified on a posting. An
# explicitly specified price may occur in a conversion, e.g.
# Assets:Account 100 USD @ 1.10 CAD
# or, if a cost is also specified, as the current price of the
# underlying instrument, e.g.
# Assets:Account 100 HOOL {564.20} @ {581.97} USD
if posting.price is not None:
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
price_entry = data.Price(meta, entry.date,
units.currency,
posting.price)
# Add costs, when we're not matching against an existing
# position. This happens when we're just specifying the cost,
# e.g.
# Assets:Account 100 HOOL {564.20}
elif (cost is not None and
booking != inventory.Booking.REDUCED):
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
price_entry = data.Price(meta, entry.date,
units.currency,
amount.Amount(cost.number,
cost.currency))
else:
price_entry = None
if price_entry is not None:
key = (price_entry.date,
price_entry.currency,
price_entry.amount.number, # Ideally should be removed.
price_entry.amount.currency)
try:
new_price_entry_map[key]
## Do not fail for now. We still have many valid use
## cases of duplicate prices on the same date, for
## example, stock splits, or trades on two dates with
## two separate reported prices. We need to figure out a
## more elegant solution for this in the long term.
## Keeping both for now. We should ideally not use the
## number in the de-dup key above.
#
# dup_entry = new_price_entry_map[key]
# if price_entry.amount.number == dup_entry.amount.number:
# # Skip duplicates.
# continue
# else:
# errors.append(
# ImplicitPriceError(
# entry.meta,
# "Duplicate prices for {} on {}".format(entry,
# dup_entry),
# entry))
except KeyError:
new_price_entry_map[key] = price_entry
new_entries.append(price_entry)
return new_entries, errors
beancount.plugins.ira_contribs
Automatically adding IRA contributions postings.
This plugin looks for increasing postings on specified accounts ('+' sign for Assets and Expenses accounts, '-' sign for the others), or postings with a particular flag on them and when it finds some, inserts a pair of postings on that transaction of the corresponding amounts in a different currency. The currency is intended to be an imaginary currency used to track the number of dollars contributed to a retirement account over time.
For example, a possible configuration could be:
plugin "beancount.plugins.ira_contribs" "{
'currency': 'IRAUSD',
'flag': 'M',
'accounts': {
'Income:US:Acme:Match401k': (
'Assets:US:Federal:Match401k',
'Expenses:Taxes:TY{year}:US:Federal:Match401k'),
('C', 'Assets:US:Fidelity:PreTax401k:Cash'): (
'Assets:US:Federal:PreTax401k',
'Expenses:Taxes:TY{year}:US:Federal:PreTax401k'),
}
}"
Note: In this example, the configuration that triggers on the "Income:US:Acme:Match401k" account does not require a flag for those accounts; the configuration for the "Assets:US:Fidelity:PreTax401k:Cash" account requires postings to have a "C" flag to trigger an insertion.
Given a transaction like the following, which would be typical for a salary entry where the employer is automatically diverting some of the pre-tax money to a retirement account (in this example, at Fidelity):
2013-02-15 * "ACME INC PAYROLL"
Income:US:Acme:Salary ...
...
Assets:US:BofA:Checking ...
Assets:US:Fidelity:PreTax401k:Cash 620.50 USD
...
A posting with account 'Assets:US:Fidelity:PreTax401k:Cash', which is configured to match, would be found. The configuration above instructs the plugin to automatically insert new postings like this:
2013-02-15 * "ACME INC PAYROLL"
...
Assets:US:Fidelity:PreTax401k:Cash 620.50 USD
M Assets:US:Federal:PreTax401k -620.50 IRAUSD
M Expenses:Taxes:TY2013:US:Federal:PreTax401k 620.50 IRAUSD
...
Notice that the "{year}" string in the configuration's account names is automatically replaced by the current year in the account name. This is useful if you maintain separate tax accounts per year.
Furthermore, as in the configuration example above, you may have multiple matching entries to trigger multiple insertions. For example, the employer may also match the employee's retirement contribution by depositing some money in the retirement account:
2013-02-15 * "BUYMF - MATCH" "Employer match, invested in SaveEasy 2030 fund"
Assets:US:Fidelity:Match401k:SE2030 34.793 SE2030 {17.834 USD}
Income:US:Acme:Match401k -620.50 USD
In this example the funds get reported as invested immediately (an intermediate deposit into a cash account does not take place). The plugin configuration would match against the 'Income:US:Acme:Match401k' account and since it increases its value (the normal balance of an Income account is negative), postings would be inserted like this:
2013-02-15 * "BUYMF - MATCH" "Employer match, invested in SaveEasy 2030 fund"
Assets:US:Fidelity:Match401k:SE2030 34.793 SE2030 {17.834 USD}
Income:US:Acme:Match401k -620.50 USD
M Assets:US:Federal:Match401k -620.50 IRAUSD
M Expenses:Taxes:TY2013:US:Federal:Match401k 620.50 IRAUSD
Note that the special dict keys 'currency' and 'flag' are used to specify which currency to use for the inserted postings, and if set, which flag to mark these postings with.
beancount.plugins.ira_contribs.add_ira_contribs(entries, options_map, config_str)
Add legs for 401k employer match contributions.
See module docstring for an example configuration.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/ira_contribs.py
def add_ira_contribs(entries, options_map, config_str):
"""Add legs for 401k employer match contributions.
See module docstring for an example configuration.
Args:
entries: a list of entry instances
options_map: a dict of options parsed from the file
config_str: A configuration string, which is intended to be a Python dict
mapping match-accounts to a pair of (negative-account, position-account)
account names.
Returns:
A tuple of entries and errors.
"""
# Parse and extract configuration values.
# FIXME: Use ast.literal_eval() here; you need to convert this code and the getters.
# FIXME: Also, don't raise a RuntimeError, return an error object; review
# this for all the plugins.
# FIXME: This too is temporary.
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
raise RuntimeError("Invalid plugin configuration: should be a single dict.")
# Currency of the inserted postings.
currency = config_obj.pop('currency', 'UNKNOWN')
# Flag to attach to the inserted postings.
insert_flag = config_obj.pop('flag', None)
# A dict of account names that trigger the insertion of postings to pairs of
# inserted accounts when triggered.
accounts = config_obj.pop('accounts', {})
# Convert the key in the accounts configuration for matching.
account_transforms = {}
for key, config in accounts.items():
if isinstance(key, str):
flag = None
account = key
else:
assert isinstance(key, tuple)
flag, account = key
account_transforms[account] = (flag, config)
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
orig_entry = entry
for posting in entry.postings:
if (posting.units is not MISSING and
(posting.account in account_transforms) and
(account_types.get_account_sign(posting.account) *
posting.units.number > 0)):
# Get the new account legs to insert.
required_flag, (neg_account,
pos_account) = account_transforms[posting.account]
assert posting.cost is None
# Check required flag if present.
if (required_flag is None or
(required_flag and required_flag == posting.flag)):
# Insert income/expense entries for 401k.
entry = add_postings(
entry,
amount.Amount(abs(posting.units.number), currency),
neg_account.format(year=entry.date.year),
pos_account.format(year=entry.date.year),
insert_flag)
if DEBUG and orig_entry is not entry:
printer.print_entry(orig_entry)
printer.print_entry(entry)
new_entries.append(entry)
return new_entries, []
beancount.plugins.ira_contribs.add_postings(entry, amount_, neg_account, pos_account, flag)
Insert positive and negative postings of a position in an entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/ira_contribs.py
def add_postings(entry, amount_, neg_account, pos_account, flag):
"""Insert positive and negative postings of a position in an entry.
Args:
entry: A Transaction instance.
amount_: An Amount instance to create the position, with positive number.
neg_account: An account for the posting with the negative amount.
pos_account: An account for the posting with the positive amount.
flag: A string, that is to be set as flag for the new postings.
Returns:
A new, modified entry.
"""
return entry._replace(postings=entry.postings + [
data.Posting(neg_account, -amount_, None, None, flag, None),
data.Posting(pos_account, amount_, None, None, flag, None),
])
beancount.plugins.leafonly
A plugin that issues errors when amounts are posted to non-leaf accounts, that is, accounts with child accounts.
This is an extra constraint that you may want to apply optionally. If you install this plugin, it will issue errors for all accounts that have postings to non-leaf accounts. Some users may want to disallow this and enforce that only leaf accounts may have postings on them.
beancount.plugins.leafonly.LeafOnlyError (tuple)
LeafOnlyError(source, message, entry)
beancount.plugins.leafonly.LeafOnlyError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/leafonly.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.leafonly.LeafOnlyError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of LeafOnlyError(source, message, entry)
beancount.plugins.leafonly.LeafOnlyError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/leafonly.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.leafonly.validate_leaf_only(entries, unused_options_map)
Check for non-leaf accounts that have postings on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/leafonly.py
def validate_leaf_only(entries, unused_options_map):
"""Check for non-leaf accounts that have postings on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
real_root = realization.realize(entries, compute_balance=False)
default_meta = data.new_metadata('<leafonly>', 0)
open_close_map = None # Lazily computed.
errors = []
for real_account in realization.iter_children(real_root):
if len(real_account) > 0 and real_account.txn_postings:
if open_close_map is None:
open_close_map = getters.get_account_open_close(entries)
try:
open_entry = open_close_map[real_account.account][0]
except KeyError:
open_entry = None
errors.append(LeafOnlyError(
open_entry.meta if open_entry else default_meta,
"Non-leaf account '{}' has postings on it".format(real_account.account),
open_entry))
return entries, errors
beancount.plugins.mark_unverified
Add metadata to Postings which occur after their last Balance directives.
Some people use Balance directives as a way to indicate that all postings before them are verified. They want to compute balances in each account as of the date of that last Balance directives. One way to do that is to use this plugin to mark the postings which occur after and to then filter them out using a WHERE clause on that metadata:
SELECT account, sum(position) WHERE NOT meta("unverified")
Note that doing such a filtering may result in a list of balances which may not add to zero.
Also, postings for accounts without a single Balance directive on them will not be marked as unverified as all (otherwise all the postings would be marked, this would make no sense).
beancount.plugins.mark_unverified.mark_unverified(entries, options_map)
Add metadata to postings after the last Balance entry. See module doc.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/mark_unverified.py
def mark_unverified(entries, options_map):
"""Add metadata to postings after the last Balance entry. See module doc.
Args:
entries: A list of data directives.
options_map: A dict of options, that confirms to beancount.parser.options.
Returns:
A list of entries, which includes the new unrealized capital gains entries
at the end, and a list of errors. The new list of entries is still sorted.
"""
# The last Balance directive seen for each account.
last_balances = {}
for entry in entries:
if isinstance(entry, data.Balance):
last_balances[entry.account] = entry
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
postings = entry.postings
new_postings = postings
for index, posting in enumerate(postings):
balance = last_balances.get(posting.account, None)
if balance and balance.date <= entry.date:
if new_postings is postings:
new_postings = postings.copy()
new_meta = posting.meta.copy()
new_meta['unverified'] = True
new_postings[index] = posting._replace(meta=new_meta)
if new_postings is not postings:
entry = entry._replace(postings=new_postings)
new_entries.append(entry)
return new_entries, []
beancount.plugins.merge_meta
Merge the metadata from a second file into the current set of entries.
This is useful if you like to keep more sensitive private data, such as account numbers or passwords, in a second, possibly encrypted file. This can be used to generate a will, for instance, for your loved ones to be able to figure where all your assets are in case you pass away. You can store all the super secret stuff in a more closely guarded, hidden away separate file.
The metadata from
- Open directives: Account name must match.
- Close directives: Account name must match.
- Commodity directives: Currency must match.
are copied over. Metadata from the external file conflicting with that present in the main file overwrites it (external data wins).
WARNING! If you include an encrypted file and the main file is not encrypted, the contents extraction from the encrypted file may appear in the cache.
beancount.plugins.merge_meta.merge_meta(entries, options_map, config)
Load a secondary file and merge its metadata in our given set of entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/merge_meta.py
def merge_meta(entries, options_map, config):
"""Load a secondary file and merge its metadata in our given set of entries.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
config: The plugin configuration string.
Returns:
A list of entries, with more metadata attached to them.
"""
external_filename = config
new_entries = list(entries)
ext_entries, ext_errors, ext_options_map = loader.load_file(external_filename)
# Map Open and Close directives.
oc_map = getters.get_account_open_close(entries)
ext_oc_map = getters.get_account_open_close(ext_entries)
for account in set(oc_map.keys()) & set(ext_oc_map.keys()):
open_entry, close_entry = oc_map[account]
ext_open_entry, ext_close_entry = ext_oc_map[account]
if open_entry and ext_open_entry:
open_entry.meta.update(ext_open_entry.meta)
if close_entry and ext_close_entry:
close_entry.meta.update(ext_close_entry.meta)
# Map Commodity directives.
comm_map = getters.get_commodity_map(entries, False)
ext_comm_map = getters.get_commodity_map(ext_entries, False)
for currency in set(comm_map) & set(ext_comm_map):
comm_entry = comm_map[currency]
ext_comm_entry = ext_comm_map[currency]
if comm_entry and ext_comm_entry:
comm_entry.meta.update(ext_comm_entry.meta)
# Note: We cannot include the external file in the list of inputs so that a
# change of it triggers a cache rebuild because side-effects on options_map
# aren't cascaded through. This is something that should be defined better
# in the plugin interface and perhaps improved upon.
return new_entries, ext_errors
beancount.plugins.noduplicates
This plugin validates that there are no duplicate transactions.
beancount.plugins.noduplicates.validate_no_duplicates(entries, unused_options_map)
Check that the entries are unique, by computing hashes.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/noduplicates.py
def validate_no_duplicates(entries, unused_options_map):
"""Check that the entries are unique, by computing hashes.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
unused_hashes, errors = compare.hash_entries(entries, exclude_meta=True)
return entries, errors
beancount.plugins.nounused
This plugin validates that there are no unused accounts.
beancount.plugins.nounused.UnusedAccountError (tuple)
UnusedAccountError(source, message, entry)
beancount.plugins.nounused.UnusedAccountError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/nounused.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.nounused.UnusedAccountError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of UnusedAccountError(source, message, entry)
beancount.plugins.nounused.UnusedAccountError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/nounused.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.nounused.validate_unused_accounts(entries, unused_options_map)
Check that all accounts declared open are actually used.
We check that all of the accounts that are open are at least referred to by another directive. These are probably unused, so issue a warning (we like to be pedantic). Note that an account that is open and then closed is considered used--this is a valid use case that may occur in reality. If you have a use case for an account to be open but never used, you can quiet that warning by initializing the account with a balance asserts or a pad directive, or even use a note will be sufficient.
(This is probably a good candidate for optional inclusion as a "pedantic" plugin.)
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/nounused.py
def validate_unused_accounts(entries, unused_options_map):
"""Check that all accounts declared open are actually used.
We check that all of the accounts that are open are at least referred to by
another directive. These are probably unused, so issue a warning (we like to
be pedantic). Note that an account that is open and then closed is
considered used--this is a valid use case that may occur in reality. If you
have a use case for an account to be open but never used, you can quiet that
warning by initializing the account with a balance asserts or a pad
directive, or even use a note will be sufficient.
(This is probably a good candidate for optional inclusion as a "pedantic"
plugin.)
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Find all the accounts referenced by entries which are not Open, and the
# open directives for error reporting below.
open_map = {}
referenced_accounts = set()
for entry in entries:
if isinstance(entry, data.Open):
open_map[entry.account] = entry
continue
referenced_accounts.update(getters.get_entry_accounts(entry))
# Create a list of suitable errors, with the location of the Open directives
# corresponding to the unused accounts.
errors = [UnusedAccountError(open_entry.meta,
"Unused account '{}'".format(account),
open_entry)
for account, open_entry in open_map.items()
if account not in referenced_accounts]
return entries, errors
beancount.plugins.onecommodity
A plugin that issues errors when more than one commodity is used in an account.
For investments or trading accounts, it can make it easier to filter the action around a single stock by using the name of the stock as the leaf of the account name.
Notes:
-
The plugin will automatically skip accounts that have explicitly declared commodities in their Open directive.
-
You can also set the metadata "onecommodity: FALSE" on an account's Open directive to skip the checks for that account.
-
If provided, the configuration should be a regular expression restricting the set of accounts to check.
beancount.plugins.onecommodity.OneCommodityError (tuple)
OneCommodityError(source, message, entry)
beancount.plugins.onecommodity.OneCommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/onecommodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.onecommodity.OneCommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of OneCommodityError(source, message, entry)
beancount.plugins.onecommodity.OneCommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/onecommodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.onecommodity.validate_one_commodity(entries, unused_options_map, config=None)
Check that each account has units in only a single commodity.
This is an extra constraint that you may want to apply optionally, despite Beancount's ability to support inventories and aggregations with more than one commodity. I believe this also matches GnuCash's model, where each account has a single commodity attached to it.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/onecommodity.py
def validate_one_commodity(entries, unused_options_map, config=None):
"""Check that each account has units in only a single commodity.
This is an extra constraint that you may want to apply optionally, despite
Beancount's ability to support inventories and aggregations with more than
one commodity. I believe this also matches GnuCash's model, where each
account has a single commodity attached to it.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The plugin configuration string, a regular expression to match
against the subset of accounts to check.
Returns:
A list of new errors, if any were found.
"""
accounts_re = re.compile(config) if config else None
# Mappings of account name to lists of currencies for each units and cost.
units_map = collections.defaultdict(set)
cost_map = collections.defaultdict(set)
# Mappings to use just for getting a relevant source.
units_source_map = {}
cost_source_map = {}
# Gather the set of accounts to skip from the Open directives.
skip_accounts = set()
for entry in entries:
if not isinstance(entry, data.Open):
continue
if (not entry.meta.get("onecommodity", True) or
(accounts_re and not accounts_re.match(entry.account)) or
(entry.currencies and len(entry.currencies) > 1)):
skip_accounts.add(entry.account)
# Accumulate all the commodities used.
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
if posting.account in skip_accounts:
continue
units = posting.units
units_map[posting.account].add(units.currency)
if len(units_map[posting.account]) > 1:
units_source_map[posting.account] = entry
cost = posting.cost
if cost:
cost_map[posting.account].add(cost.currency)
if len(cost_map[posting.account]) > 1:
units_source_map[posting.account] = entry
elif isinstance(entry, data.Balance):
if entry.account in skip_accounts:
continue
units_map[entry.account].add(entry.amount.currency)
if len(units_map[entry.account]) > 1:
units_source_map[entry.account] = entry
elif isinstance(entry, data.Open):
if entry.currencies and len(entry.currencies) > 1:
skip_accounts.add(entry.account)
# Check units.
errors = []
for account, currencies in units_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
units_source_map[account].meta,
"More than one currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
# Check costs.
for account, currencies in cost_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
cost_source_map[account].meta,
"More than one cost currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
return entries, errors
beancount.plugins.pedantic
A plugin of plugins which triggers are all the pedantic plugins.
In a sense, this is the inverse of "pedantic." This is useful when doing some types of quick and dirty tests.
beancount.plugins.sellgains
A plugin that cross-checks declared gains against prices on lot sales.
When you sell stock, the gains can be automatically implied by the corresponding cash amounts. For example, in the following transaction the 2nd and 3rd postings should match the value of the stock sold:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL -10.125 USD
The cost basis is checked against: 2141.36 + 008 + -10.125. That is, the balance checks computes
-81 x 26.3125 = -2131.3125 + 2141.36 + 0.08 + -10.125
and checks that the residual is below a small tolerance.
But... usually the income leg isn't given to you in statements. Beancount can automatically infer it using the balance, which is convenient, like this:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
Additionally, most often you have the sales prices given to you on your transaction confirmation statement, so you can enter this:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD} @ 26.4375 USD
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
So in theory, if the price is given (26.4375 USD), we could verify that the proceeds from the sale at the given price match non-Income postings. That is, verify that
-81 x 26.4375 = -2141.4375 + 2141.36 + 0.08 +
is below a small tolerance value. So this plugin does this.
In general terms, it does the following: For transactions with postings that have a cost and a price, it verifies that the sum of the positions on all postings to non-income accounts is below tolerance.
This provides yet another level of verification and allows you to elide the income amounts, knowing that the price is there to provide an extra level of error-checking in case you enter a typo.
beancount.plugins.sellgains.SellGainsError (tuple)
SellGainsError(source, message, entry)
beancount.plugins.sellgains.SellGainsError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/sellgains.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.sellgains.SellGainsError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of SellGainsError(source, message, entry)
beancount.plugins.sellgains.SellGainsError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/sellgains.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.sellgains.validate_sell_gains(entries, options_map)
Check the sum of asset account totals for lots sold with a price on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/sellgains.py
def validate_sell_gains(entries, options_map):
"""Check the sum of asset account totals for lots sold with a price on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
acc_types = options.get_account_types(options_map)
proceed_types = set([acc_types.assets,
acc_types.liabilities,
acc_types.equity,
acc_types.expenses])
for entry in entries:
if not isinstance(entry, data.Transaction):
continue
# Find transactions whose lots at cost all have a price.
postings_at_cost = [posting
for posting in entry.postings
if posting.cost is not None]
if not postings_at_cost or not all(posting.price is not None
for posting in postings_at_cost):
continue
# Accumulate the total expected proceeds and the sum of the asset and
# expenses legs.
total_price = inventory.Inventory()
total_proceeds = inventory.Inventory()
for posting in entry.postings:
# If the posting is held at cost, add the priced value to the balance.
if posting.cost is not None:
assert posting.price is not None
price = posting.price
total_price.add_amount(amount.mul(price, -posting.units.number))
else:
# Otherwise, use the weight and ignore postings to Income accounts.
atype = account_types.get_account_type(posting.account)
if atype in proceed_types:
total_proceeds.add_amount(convert.get_weight(posting))
# Compare inventories, currency by currency.
dict_price = {pos.units.currency: pos.units.number
for pos in total_price}
dict_proceeds = {pos.units.currency: pos.units.number
for pos in total_proceeds}
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
invalid = False
for currency, price_number in dict_price.items():
# Accept a looser than usual tolerance because rounding occurs
# differently. Also, it would be difficult for the user to satisfy
# two sets of constraints manually.
tolerance = tolerances.get(currency) * EXTRA_TOLERANCE_MULTIPLIER
proceeds_number = dict_proceeds.pop(currency, ZERO)
diff = abs(price_number - proceeds_number)
if diff > tolerance:
invalid = True
break
if invalid or dict_proceeds:
errors.append(
SellGainsError(
entry.meta,
"Invalid price vs. proceeds/gains: {} vs. {}".format(
total_price, total_proceeds),
entry))
return entries, errors
beancount.plugins.split_expenses
Split expenses of a Beancount ledger between multiple people.
This plugin is given a list of names. It assumes that any Expenses account whose components do not include any of the given names are to be split between the members. It goes through all the transactions and converts all such postings into multiple postings, one for each member.
For example, given the names 'Martin' and 'Caroline', the following transaction:
2015-02-01 * "Aqua Viva Tulum - two nights"
Income:Caroline:CreditCard -269.00 USD
Expenses:Accommodation
Will be converted to this:
2015-02-01 * "Aqua Viva Tulum - two nights"
Income:Caroline:CreditCard -269.00 USD
Expenses:Accommodation:Martin 134.50 USD
Expenses:Accommodation:Caroline 134.50 USD
After these transformations, all account names should include the name of a member. You can generate reports for a particular person by filtering postings to accounts with a component by their name.
beancount.plugins.split_expenses.get_participants(filename, options_map)
Get the list of participants from the plugin configuration in the input file.
Parameters: |
|
---|
Returns: |
|
---|
Exceptions: |
|
---|
Source code in beancount/plugins/split_expenses.py
def get_participants(filename, options_map):
"""Get the list of participants from the plugin configuration in the input file.
Args:
options_map: The options map, as produced by the parser.
Returns:
A list of strings, the names of participants as they should appear in the
account names.
Raises:
KeyError: If the configuration does not contain configuration for the list
of participants.
"""
plugin_options = dict(options_map["plugin"])
try:
return plugin_options["beancount.plugins.split_expenses"].split()
except KeyError:
raise KeyError("Could not find the split_expenses plugin configuration.")
beancount.plugins.split_expenses.main()
Generate final reports for a shared expenses on a trip or project.
For each of many participants, generate a detailed list of expenses, contributions, a categorized summary of expenses, and a final balance. Also produce a global list of final balances so that participants can reconcile between each other.
Source code in beancount/plugins/split_expenses.py
def main():
"""Generate final reports for a shared expenses on a trip or project.
For each of many participants, generate a detailed list of expenses,
contributions, a categorized summary of expenses, and a final balance. Also
produce a global list of final balances so that participants can reconcile
between each other.
"""
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s: %(message)s')
parser = version.ArgumentParser(description=__doc__.strip())
parser.add_argument('filename', help='Beancount input filename')
parser.add_argument('-c', '--currency', action='store',
help="Convert all the amounts to a single common currency")
oparser = parser.add_argument_group('Outputs')
oparser.add_argument('-o', '--output-text', '--text', action='store',
help="Render results to text boxes")
oparser.add_argument('--output-csv', '--csv', action='store',
help="Render results to CSV files")
oparser.add_argument('--output-stdout', '--stdout', action='store_true',
help="Render results to stdout")
args = parser.parse_args()
# Ensure the directories exist.
for directory in [args.output_text, args.output_csv]:
if directory and not path.exists(directory):
os.makedirs(directory, exist_ok=True)
# Load the input file and get the list of participants.
entries, errors, options_map = loader.load_file(args.filename)
participants = get_participants(args.filename, options_map)
for participant in participants:
print("Participant: {}".format(participant))
save_query("balances", participant, entries, options_map, r"""
SELECT
PARENT(account) AS account,
CONV[SUM(position)] AS amount
WHERE account ~ ':\b{}'
GROUP BY 1
ORDER BY 2 DESC
""", participant, boxed=False, args=args)
save_query("expenses", participant, entries, options_map, r"""
SELECT
date, flag, description,
PARENT(account) AS account,
JOINSTR(links) AS links,
CONV[position] AS amount,
CONV[balance] AS balance
WHERE account ~ 'Expenses.*\b{}'
""", participant, args=args)
save_query("income", participant, entries, options_map, r"""
SELECT
date, flag, description,
account,
JOINSTR(links) AS links,
CONV[position] AS amount,
CONV[balance] AS balance
WHERE account ~ 'Income.*\b{}'
""", participant, args=args)
save_query("final", None, entries, options_map, r"""
SELECT
GREP('\b({})\b', account) AS participant,
CONV[SUM(position)] AS balance
GROUP BY 1
ORDER BY 2
""", '|'.join(participants), args=args)
# FIXME: Make this output to CSV files and upload to a spreadsheet.
# FIXME: Add a fixed with option. This requires changing adding this to the
# the renderer to be able to have elastic space and line splitting..
beancount.plugins.split_expenses.save_query(title, participant, entries, options_map, sql_query, *format_args, *, boxed=True, spaced=False, args=None)
Save the multiple files for this query.
Parameters: |
|
---|
Source code in beancount/plugins/split_expenses.py
def save_query(title, participant, entries, options_map, sql_query, *format_args,
boxed=True, spaced=False, args=None):
"""Save the multiple files for this query.
Args:
title: A string, the title of this particular report to render.
participant: A string, the name of the participant under consideration.
entries: A list of directives (as per the loader).
options_map: A dict of options (as per the loader).
sql_query: A string with the SQL query, possibly with some placeholders left for
*format_args to replace.
*format_args: A tuple of arguments to be formatted into the SQL query string.
This is provided as a convenience.
boxed: A boolean, true if we should render the results in a fancy-looking ASCII box.
spaced: If true, leave an empty line between each of the rows. This is useful if the
results have a lot of rows that render over multiple lines.
args: A dummy object with the following attributes:
output_text: An optional directory name, to produce a text rendering of
the report.
output_csv: An optional directory name, to produce a CSV rendering of
the report.
output_stdout: A boolean, if true, also render the output to stdout.
currency: An optional currency (a string). If you use this, you should
wrap query targets to be converted with the pseudo-function
"CONV[...]" and it will get replaced to CONVERT(..., CURRENCY)
automatically.
"""
# Replace CONV() to convert the currencies or not; if so, replace to
# CONVERT(..., currency).
replacement = (r'\1'
if args.currency is None else
r'CONVERT(\1, "{}")'.format(args.currency))
sql_query = re.sub(r'CONV\[(.*?)\]', replacement, sql_query)
# Run the query.
rtypes, rrows = query.run_query(entries, options_map,
sql_query, *format_args,
numberify=True)
# The base of all filenames.
filebase = title.replace(' ', '_')
fmtopts = dict(boxed=boxed,
spaced=spaced)
# Output the text files.
if args.output_text:
basedir = (path.join(args.output_text, participant)
if participant
else args.output_text)
os.makedirs(basedir, exist_ok=True)
filename = path.join(basedir, filebase + '.txt')
with open(filename, 'w') as file:
query_render.render_text(rtypes, rrows, options_map['dcontext'],
file, **fmtopts)
# Output the CSV files.
if args.output_csv:
basedir = (path.join(args.output_csv, participant)
if participant
else args.output_csv)
os.makedirs(basedir, exist_ok=True)
filename = path.join(basedir, filebase + '.csv')
with open(filename, 'w') as file:
query_render.render_csv(rtypes, rrows, options_map['dcontext'],
file, expand=False)
if args.output_stdout:
# Write out the query to stdout.
query_render.render_text(rtypes, rrows, options_map['dcontext'],
sys.stdout, **fmtopts)
beancount.plugins.split_expenses.split_expenses(entries, options_map, config)
Split postings according to expenses (see module docstring for details).
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/split_expenses.py
def split_expenses(entries, options_map, config):
"""Split postings according to expenses (see module docstring for details).
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
config: The plugin configuration string.
Returns:
A list of entries, with potentially more accounts and potentially more
postings with smaller amounts.
"""
# Validate and sanitize configuration.
if isinstance(config, str):
members = config.split()
elif isinstance(config, (tuple, list)):
members = config
else:
raise RuntimeError("Invalid plugin configuration: configuration for split_expenses "
"should be a string or a sequence.")
acctypes = options.get_account_types(options_map)
def is_expense_account(account):
return account_types.get_account_type(account) == acctypes.expenses
# A predicate to quickly identify if an account contains the name of a
# member.
is_individual_account = re.compile('|'.join(map(re.escape, members))).search
# Existing and previously unseen accounts.
new_accounts = set()
# Filter the entries and transform transactions.
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
new_postings = []
for posting in entry.postings:
if (is_expense_account(posting.account) and
not is_individual_account(posting.account)):
# Split this posting into multiple postings.
split_units = amount.Amount(posting.units.number / len(members),
posting.units.currency)
for member in members:
# Mark the account as new if never seen before.
subaccount = account.join(posting.account, member)
new_accounts.add(subaccount)
# Ensure the modified postings are marked as
# automatically calculated, so that the resulting
# calculated amounts aren't used to affect inferred
# tolerances.
meta = posting.meta.copy() if posting.meta else {}
meta[interpolate.AUTOMATIC_META] = True
# Add a new posting for each member, to a new account
# with the name of this member.
new_postings.append(
posting._replace(meta=meta,
account=subaccount,
units=split_units,
cost=posting.cost))
else:
new_postings.append(posting)
# Modify the entry in-place, replace its postings.
entry = entry._replace(postings=new_postings)
new_entries.append(entry)
# Create Open directives for new subaccounts if necessary.
oc_map = getters.get_account_open_close(entries)
open_date = entries[0].date
meta = data.new_metadata('<split_expenses>', 0)
open_entries = []
for new_account in new_accounts:
if new_account not in oc_map:
entry = data.Open(meta, open_date, new_account, None, None)
open_entries.append(entry)
return open_entries + new_entries, []
beancount.plugins.tag_pending
An example of tracking unpaid payables or receivables.
A user with lots of invoices to track may want to produce a report of pending or incomplete payables or receivables. Beancount does not by default offer such a dedicated feature, but it is easy to build one by using existing link attributes on transactions. This is an example on how to implement that with a plugin.
For example, assuming the user enters linked transactions like this:
2013-03-28 * "Bill for datacenter electricity" ^invoice-27a30ab61191
Expenses:Electricity 450.82 USD
Liabilities:AccountsPayable
2013-04-15 * "Paying electricity company" ^invoice-27a30ab61191
Assets:Checking -450.82 USD
Liabilities:AccountsPayable
Transactions are grouped by link ("invoice-27a30ab61191") and then the intersection of their common accounts is automatically calculated ("Liabilities:AccountsPayable"). We then add up the balance of all the postings for this account in this link group and check if the sum is zero. If there is a residual amount in this balance, we mark the associated entries as incomplete by inserting a #PENDING tag on them. The user can then use that tag to navigate to the corresponding view in the web interface, or just find the entries and produce a listing of them.
beancount.plugins.tag_pending.tag_pending_plugin(entries, options_map)
A plugin that finds and tags pending transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/tag_pending.py
def tag_pending_plugin(entries, options_map):
"""A plugin that finds and tags pending transactions.
Args:
entries: A list of entry instances.
options_map: A dict of options parsed from the file.
Returns:
A tuple of entries and errors.
"""
return (tag_pending_transactions(entries, 'PENDING'), [])
beancount.plugins.tag_pending.tag_pending_transactions(entries, tag_name='PENDING')
Filter out incomplete linked transactions to a transfer account.
Given a list of entries, group the entries by their link and compute the balance of the intersection of their common accounts. If the balance does not sum to zero, insert a 'tag_name' tag in the entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/tag_pending.py
def tag_pending_transactions(entries, tag_name='PENDING'):
"""Filter out incomplete linked transactions to a transfer account.
Given a list of entries, group the entries by their link and compute the
balance of the intersection of their common accounts. If the balance does
not sum to zero, insert a 'tag_name' tag in the entries.
Args:
entries: A list of directives/transactions to process.
tag_name: A string, the name of the tag to be inserted if a linked group
of entries is found not to match
Returns:
A modified set of entries, possibly tagged as pending.
"""
link_groups = basicops.group_entries_by_link(entries)
pending_entry_ids = set()
for link, link_entries in link_groups.items():
assert link_entries
if len(link_entries) == 1:
# If a single entry is present, it is assumed incomplete.
pending_entry_ids.add(id(link_entries[0]))
else:
# Compute the sum total balance of the common accounts.
common_accounts = basicops.get_common_accounts(link_entries)
common_balance = inventory.Inventory()
for entry in link_entries:
for posting in entry.postings:
if posting.account in common_accounts:
common_balance.add_position(posting)
# Mark entries as pending if a residual balance is found.
if not common_balance.is_empty():
for entry in link_entries:
pending_entry_ids.add(id(entry))
# Insert tags if marked.
return [(entry._replace(tags=(entry.tags or set()) | set((tag_name,)))
if id(entry) in pending_entry_ids
else entry)
for entry in entries]
beancount.plugins.unique_prices
This module adds validation that there is a single price defined per date and base/quote currencies. If multiple conflicting price values are declared, an error is generated. Note that multiple price entries with the same number do not generate an error.
This is meant to be turned on if you want to use a very strict mode for entering prices, and may not be realistic usage. For example, if you have (1) a transaction with an implicitly generated price during the day (from its cost) and (2) a separate explicit price directive that declares a different price for the day's closing price, this would generate an error. I'm not certain this will be useful in the long run, so placing it in a plugin.
beancount.plugins.unique_prices.UniquePricesError (tuple)
UniquePricesError(source, message, entry)
beancount.plugins.unique_prices.UniquePricesError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/unique_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.unique_prices.UniquePricesError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of UniquePricesError(source, message, entry)
beancount.plugins.unique_prices.UniquePricesError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/unique_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.unique_prices.validate_unique_prices(entries, unused_options_map)
Check that there is only a single price per day for a particular base/quote.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/unique_prices.py
def validate_unique_prices(entries, unused_options_map):
"""Check that there is only a single price per day for a particular base/quote.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
The list of input entries, and a list of new UniquePricesError instances generated.
"""
new_entries = []
errors = []
prices = collections.defaultdict(list)
for entry in entries:
if not isinstance(entry, data.Price):
continue
key = (entry.date, entry.currency, entry.amount.currency)
prices[key].append(entry)
errors = []
for price_entries in prices.values():
if len(price_entries) > 1:
number_map = {price_entry.amount.number: price_entry
for price_entry in price_entries}
if len(number_map) > 1:
# Note: This should be a list of entries for better error
# reporting. (Later.)
error_entry = next(iter(number_map.values()))
errors.append(
UniquePricesError(error_entry.meta,
"Disagreeing price entries",
price_entries))
return new_entries, errors
beancount.plugins.unrealized
Compute unrealized gains.
The configuration for this plugin is a single string, the name of the subaccount to add to post the unrealized gains to, like this:
plugin "beancount.plugins.unrealized" "Unrealized"
If you don't specify a name for the subaccount (the configuration value is optional), by default it inserts the unrealized gains in the same account that is being adjusted.
beancount.plugins.unrealized.UnrealizedError (tuple)
UnrealizedError(source, message, entry)
beancount.plugins.unrealized.UnrealizedError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/unrealized.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.unrealized.UnrealizedError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of UnrealizedError(source, message, entry)
beancount.plugins.unrealized.UnrealizedError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/unrealized.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.unrealized.add_unrealized_gains(entries, options_map, subaccount=None)
Insert entries for unrealized capital gains.
This function inserts entries that represent unrealized gains, at the end of the available history. It returns a new list of entries, with the new gains inserted. It replaces the account type with an entry in an income account. Optionally, it can book the gain in a subaccount of the original and income accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/unrealized.py
def add_unrealized_gains(entries, options_map, subaccount=None):
"""Insert entries for unrealized capital gains.
This function inserts entries that represent unrealized gains, at the end of
the available history. It returns a new list of entries, with the new gains
inserted. It replaces the account type with an entry in an income account.
Optionally, it can book the gain in a subaccount of the original and income
accounts.
Args:
entries: A list of data directives.
options_map: A dict of options, that confirms to beancount.parser.options.
subaccount: A string, and optional the name of a subaccount to create
under an account to book the unrealized gain. If this is left to its
default value, the gain is booked directly in the same account.
Returns:
A list of entries, which includes the new unrealized capital gains entries
at the end, and a list of errors. The new list of entries is still sorted.
"""
errors = []
meta = data.new_metadata('<unrealized_gains>', 0)
account_types = options.get_account_types(options_map)
# Assert the subaccount name is in valid format.
if subaccount:
validation_account = account.join(account_types.assets, subaccount)
if not account.is_valid(validation_account):
errors.append(
UnrealizedError(meta,
"Invalid subaccount name: '{}'".format(subaccount),
None))
return entries, errors
if not entries:
return (entries, errors)
# Group positions by (account, cost, cost_currency).
price_map = prices.build_price_map(entries)
holdings_list = holdings.get_final_holdings(entries, price_map=price_map)
# Group positions by (account, cost, cost_currency).
holdings_list = holdings.aggregate_holdings_by(
holdings_list, lambda h: (h.account, h.currency, h.cost_currency))
# Get the latest prices from the entries.
price_map = prices.build_price_map(entries)
# Create transactions to account for each position.
new_entries = []
latest_date = entries[-1].date
for index, holding in enumerate(holdings_list):
if (holding.currency == holding.cost_currency or
holding.cost_currency is None):
continue
# Note: since we're only considering positions held at cost, the
# transaction that created the position *must* have created at least one
# price point for that commodity, so we never expect for a price not to
# be available, which is reasonable.
if holding.price_number is None:
# An entry without a price might indicate that this is a holding
# resulting from leaked cost basis. {0ed05c502e63, b/16}
if holding.number:
errors.append(
UnrealizedError(meta,
"A valid price for {h.currency}/{h.cost_currency} "
"could not be found".format(h=holding), None))
continue
# Compute the PnL; if there is no profit or loss, we create a
# corresponding entry anyway.
pnl = holding.market_value - holding.book_value
if holding.number == ZERO:
# If the number of units sum to zero, the holdings should have been
# zero.
errors.append(
UnrealizedError(
meta,
"Number of units of {} in {} in holdings sum to zero "
"for account {} and should not".format(
holding.currency, holding.cost_currency, holding.account),
None))
continue
# Compute the name of the accounts and add the requested subaccount name
# if requested.
asset_account = holding.account
income_account = account.join(account_types.income,
account.sans_root(holding.account))
if subaccount:
asset_account = account.join(asset_account, subaccount)
income_account = account.join(income_account, subaccount)
# Create a new transaction to account for this difference in gain.
gain_loss_str = "gain" if pnl > ZERO else "loss"
narration = ("Unrealized {} for {h.number} units of {h.currency} "
"(price: {h.price_number:.4f} {h.cost_currency} as of {h.price_date}, "
"average cost: {h.cost_number:.4f} {h.cost_currency})").format(
gain_loss_str, h=holding)
entry = data.Transaction(data.new_metadata(meta["filename"], lineno=1000 + index),
latest_date, flags.FLAG_UNREALIZED,
None, narration, EMPTY_SET, EMPTY_SET, [])
# Book this as income, converting the account name to be the same, but as income.
# Note: this is a rather convenient but arbitrary choice--maybe it would be best to
# let the user decide to what account to book it, but I don't a nice way to let the
# user specify this.
#
# Note: we never set a price because we don't want these to end up in Conversions.
entry.postings.extend([
data.Posting(
asset_account,
amount.Amount(pnl, holding.cost_currency),
None,
None,
None,
None),
data.Posting(
income_account,
amount.Amount(-pnl, holding.cost_currency),
None,
None,
None,
None)
])
new_entries.append(entry)
# Ensure that the accounts we're going to use to book the postings exist, by
# creating open entries for those that we generated that weren't already
# existing accounts.
new_accounts = {posting.account
for entry in new_entries
for posting in entry.postings}
open_entries = getters.get_account_open_close(entries)
new_open_entries = []
for account_ in sorted(new_accounts):
if account_ not in open_entries:
meta = data.new_metadata(meta["filename"], index)
open_entry = data.Open(meta, latest_date, account_, None, None)
new_open_entries.append(open_entry)
return (entries + new_open_entries + new_entries, errors)
beancount.plugins.unrealized.get_unrealized_entries(entries)
Return entries automatically created for unrealized gains.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/unrealized.py
def get_unrealized_entries(entries):
"""Return entries automatically created for unrealized gains.
Args:
entries: A list of directives.
Returns:
A list of directives, all of which are in the original list.
"""
return [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.flag == flags.FLAG_UNREALIZED)]