beancount.plugins
Example plugins for filtering transactions.
These are various examples of how to filter entries in various creative ways.
IMPORTANT: These are not meant to be complete features, rather just experiments in problem-solving using Beancount, work-in-progress that can be selectively installed via a --plugin option, or one-offs to answer questions on the mailing-list.
beancount.plugins.auto
A plugin of plugins which triggers are all the automatic and lax plugins.
In a sense, this is the inverse of "pedantic." This is useful when doing some types of quick and dirty tests. You can just import the "auto" plugin. Put that in a macro.
Also see: the 'pedantic' plugin.
beancount.plugins.auto_accounts
This module automatically inserts Open directives for accounts not opened (at the date of the first entry) and automatically removes open directives for unused accounts. This can be used as a convenience for doing demos, or when setting up your initial transactions, as an intermediate step.
beancount.plugins.auto_accounts.auto_insert_open(entries, unused_options_map)
Insert Open directives for accounts not opened.
Open directives are inserted at the date of the first entry. Open directives for unused accounts are removed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/auto_accounts.py
def auto_insert_open(entries, unused_options_map):
"""Insert Open directives for accounts not opened.
Open directives are inserted at the date of the first entry. Open directives
for unused accounts are removed.
Args:
entries: A list of directives.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Open entries than before, and a
list of errors.
"""
opened_accounts = {entry.account
for entry in entries
if isinstance(entry, data.Open)}
new_entries = []
accounts_first, _ = getters.get_accounts_use_map(entries)
for index, (account, date_first_used) in enumerate(sorted(accounts_first.items())):
if account not in opened_accounts:
meta = data.new_metadata('<auto_accounts>', index)
new_entries.append(data.Open(meta, date_first_used, account,
None, None))
if new_entries:
new_entries.extend(entries)
new_entries.sort(key=data.entry_sortkey)
else:
new_entries = entries
return new_entries, []
beancount.plugins.check_average_cost
A plugin that ensures cost basis is preserved in unbooked transactions.
This is intended to be used in accounts using the "NONE" booking method, to manually ensure that the sum total of the cost basis of reducing legs matches the average of what's in the account inventory. This is a partial first step toward implementing the "AVERAGE" booking method. In other words, this plugins provides assertions that will constrain you to approximate what the "AVERAGE" booking method will do, manually, and not to leak too much cost basis through unmatching bookings without checks. (Note the contrived context here: Ideally the "NONE" booking method would simply not exist.)
beancount.plugins.check_average_cost.MatchBasisError (tuple)
MatchBasisError(source, message, entry)
beancount.plugins.check_average_cost.MatchBasisError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/check_average_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_average_cost.MatchBasisError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of MatchBasisError(source, message, entry)
beancount.plugins.check_average_cost.MatchBasisError.__replace__(/, self, **kwds)
special
Return a new MatchBasisError object replacing specified fields with new values
Source code in beancount/plugins/check_average_cost.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_average_cost.MatchBasisError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/check_average_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_average_cost.validate_average_cost(entries, options_map, config_str=None)
Check that reducing legs on unbooked postings are near the average cost basis.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_average_cost.py
def validate_average_cost(entries, options_map, config_str=None):
"""Check that reducing legs on unbooked postings are near the average cost basis.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: The configuration as a string version of a float.
Returns:
A list of new errors, if any were found.
"""
# Initialize tolerance bounds.
if config_str and config_str.strip():
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, float):
raise RuntimeError("Invalid configuration for check_average_cost: "
"must be a float")
tolerance = config_obj
else:
tolerance = DEFAULT_TOLERANCE
min_tolerance = D(1 - tolerance)
max_tolerance = D(1 + tolerance)
errors = []
ocmap = getters.get_account_open_close(entries)
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
if isinstance(entry, Transaction):
for posting in entry.postings:
dopen = ocmap.get(posting.account, None)
# Only process accounts with a NONE booking value.
if dopen and dopen[0] and dopen[0].booking == Booking.NONE:
balance = balances[(posting.account,
posting.units.currency,
posting.cost.currency if posting.cost else None)]
if posting.units.number < ZERO:
average = balance.average().get_only_position()
if average is not None:
number = average.cost.number
min_valid = number * min_tolerance
max_valid = number * max_tolerance
if not (min_valid <= posting.cost.number <= max_valid):
errors.append(
MatchBasisError(
entry.meta,
("Cost basis on reducing posting is too far from "
"the average cost ({} vs. {})".format(
posting.cost.number, average.cost.number)),
entry))
balance.add_position(posting)
return entries, errors
beancount.plugins.check_closing
A plugin that automatically inserts a balance check on a tagged closing posting.
Some postings are known to the user to be "closing trades", which means that the resulting position of the instrument just after the trade should be zero. For instance, this is the case for most ordinary options trading, only one lot of a particular instrument is held, and eventually expires or gets sold off in its entirely. One would like to confirm that, and the way to do this in Beancount is to insert a balance check.
This plugin allows you to do that more simply, by inserting metadata. For example, this transaction:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
closing: TRUE
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
Would expand into the following two directives:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
2018-02-17 balance Assets:US:Brokerage:Main:Options 0 QQQ180216C160
Insert the closing line when you know you're closing the position.
beancount.plugins.check_closing.check_closing(entries, options_map)
Expand 'closing' metadata to a zero balance check.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_closing.py
def check_closing(entries, options_map):
"""Expand 'closing' metadata to a zero balance check.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
for i, posting in enumerate(entry.postings):
if posting.meta and posting.meta.get('closing', False):
# Remove the metadata.
meta = posting.meta.copy()
del meta['closing']
posting = posting._replace(meta=meta)
entry.postings[i] = posting
# Insert a balance.
date = entry.date + datetime.timedelta(days=1)
balance = data.Balance(data.new_metadata("<check_closing>", 0),
date, posting.account,
amount.Amount(ZERO, posting.units.currency),
None, None)
new_entries.append(balance)
new_entries.append(entry)
return new_entries, []
beancount.plugins.check_commodity
A plugin that verifies that all seen commodities have a Commodity directive.
This is useful if you're a bit pedantic and like to make sure that you're declared attributes for each of the commodities you use. It's useful if you use the portfolio export, for example.
You can provide a mapping of (account-regexp, currency-regexp) as options, to specify which commodities to ignore from this plugin selectively. Use this sparingly, as it is an out from the checks that this plugin provides. However, in an active options trading account, a ton of products get inserted and the number of commodity directives can be overwhelming and it's not productive to declare each of the options contracts - names with an embedded strike and expiration date, such as 'SPX_121622P3300' - individually.
Note that if a symbol has been ignored in at least one account, it will therefore be further in all Price directives and Metadata values.
beancount.plugins.check_commodity.CheckCommodityError (tuple)
CheckCommodityError(source, message, entry)
beancount.plugins.check_commodity.CheckCommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/check_commodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_commodity.CheckCommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CheckCommodityError(source, message, entry)
beancount.plugins.check_commodity.CheckCommodityError.__replace__(/, self, **kwds)
special
Return a new CheckCommodityError object replacing specified fields with new values
Source code in beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_commodity.CheckCommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/check_commodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_commodity.ConfigError (tuple)
ConfigError(source, message, entry)
beancount.plugins.check_commodity.ConfigError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/check_commodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_commodity.ConfigError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ConfigError(source, message, entry)
beancount.plugins.check_commodity.ConfigError.__replace__(/, self, **kwds)
special
Return a new ConfigError object replacing specified fields with new values
Source code in beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_commodity.ConfigError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/check_commodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_commodity.get_commodity_map_ex(entries, metadata=False)
Find and extract commodities in the stream of directives.
Source code in beancount/plugins/check_commodity.py
def get_commodity_map_ex(entries, metadata=False):
"""Find and extract commodities in the stream of directives."""
# Find commodity names in metadata.
#
# TODO(dnicolodi) Unfortunately detecting commodities in metadata
# values may result in false positives: common used string are
# matched by the regular expression. Revisit this when commodities
# will be represented with their own type.
ignore = set(['filename', 'lineno', '__automatic__'])
regexp = re.compile(CURRENCY_RE)
def currencies_in_meta(entry):
if entry.meta is not None:
for key, value in entry.meta.items():
if isinstance(value, str) and key not in ignore:
if regexp.fullmatch(value):
yield value
commodities_map = {}
occurrences = set()
for entry in entries:
if isinstance(entry, data.Commodity):
commodities_map[entry.currency] = entry
elif isinstance(entry, data.Open):
if entry.currencies:
for currency in entry.currencies:
occurrences.add((entry.account, currency))
elif isinstance(entry, data.Transaction):
for posting in entry.postings:
# Main currency.
units = posting.units
occurrences.add((posting.account, units.currency))
# Currency in cost.
cost = posting.cost
if cost:
occurrences.add((posting.account, cost.currency))
# Currency in price.
price = posting.price
if price:
occurrences.add((posting.account, price.currency))
# Currency in posting metadata.
if metadata:
for currency in currencies_in_meta(posting):
occurrences.add((posting.account, currency))
elif isinstance(entry, data.Balance):
occurrences.add((entry.account, entry.amount.currency))
elif isinstance(entry, data.Price):
occurrences.add((PRICE_CONTEXT, entry.currency))
occurrences.add((PRICE_CONTEXT, entry.amount.currency))
# Entry metadata.
if metadata:
for currency in currencies_in_meta(entry):
occurrences.add((METADATA_CONTEXT, currency))
return occurrences, commodities_map
beancount.plugins.check_commodity.validate_commodity_directives(entries, options_map, config_str=None)
Find all commodities used and ensure they have a corresponding Commodity directive.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_commodity.py
def validate_commodity_directives(entries, options_map, config_str=None):
"""Find all commodities used and ensure they have a corresponding Commodity directive.
Args:
entries: A list of directives.
options_map: An options map.
config_str: The configuration as a string version of a float.
Returns:
A list of new errors, if any were found.
"""
errors = []
# pylint: disable=eval-used
if config_str:
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
errors.append(ConfigError(
data.new_metadata('<commodity_attr>', 0),
"Invalid configuration for check_commodity plugin; skipping.", None))
return entries, errors
else:
config_obj = {}
# Compile the regular expressions, producing an error if invalid.
ignore_map = {}
for key, value in config_obj.items():
kv = []
for pattern in key, value:
try:
kv.append(re.compile(pattern).match)
except re.error:
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta, "Invalid regexp: '{}' for {}".format(value, key), None))
if len(kv) == 2:
ignore_map[kv[0]] = kv[1]
# Get all the occurrences of commodities and a mapping of the directives.
#
# TODO(blais): Establish a distinction at the parser level for commodities
# and strings, so that we can turn detection of them in metadata.
occurrences, commodity_map = get_commodity_map_ex(entries, metadata=False)
# Process all currencies with context.
issued = set()
ignored = set()
anonymous = set()
for context, currency in sorted(occurrences):
if context in ANONYMOUS:
anonymous.add((context, currency))
continue
commodity_entry = commodity_map.get(currency, None)
# Skip if the commodity was declared, or if an error for that commodity
# has already been issued.
if commodity_entry is not None or currency in issued:
continue
# If any of the ignore patterns matches, ignore and record ignored.
if any((context_re(context) and currency_re(currency))
for context_re, currency_re in ignore_map.items()):
ignored.add(currency)
continue
# Issue error.
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta,
"Missing Commodity directive for '{}' in '{}'".format(
currency, context),
None))
# Process it only once.
issued.add(currency)
# Process all currencies out of context, automatically ignoring those which
# have already been issued with account context..
for context, currency in sorted(anonymous):
commodity_entry = commodity_map.get(currency, None)
# Skip if (a) the commodity was declared, any of the ignore patterns
# matches, or an error for that commodity has already been issued.
if (commodity_entry is not None or
currency in issued or
currency in ignored):
continue
# Issue error.
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta,
"Missing Commodity directive for '{}' in '{}'".format(
currency, context),
None))
return entries, errors
beancount.plugins.check_drained
Insert a balance check for zero before balance sheets accounts are closed.
For balance sheet accounts with a Close directive (Assets, Liabilities & Equity), insert Balance directives just after its closing date, for all the commodities that have appeared in that account and that are declared as legal on it as well. This performs the equivalent of the following transformation:
2018-02-01 open Assets:Project:Cash USD,CAD
...
2020-02-01 close Assets:Project:Cash
!!! to 2018-02-01 open Assets:Project:Cash USD,CAD ...
2020-02-01 close Assets:Project:Cash
2020-02-02 balance Assets:Project:Cash 0 USD
2020-02-02 balance Assets:Project:Cash 0 CAD
beancount.plugins.check_drained.check_drained(entries, options_map)
Check that closed accounts are empty.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/check_drained.py
def check_drained(entries, options_map):
"""Check that closed accounts are empty.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
acctypes = options.get_account_types(options_map)
is_covered = functools.partial(
account_types.is_balance_sheet_account, account_types=acctypes
)
new_entries = []
currencies = collections.defaultdict(set)
balances = collections.defaultdict(set)
for entry in entries:
if isinstance(entry, data.Transaction):
# Accumulate all the currencies seen in each account over time.
for posting in entry.postings:
if is_covered(posting.account):
currencies[posting.account].add(posting.units.currency)
elif isinstance(entry, data.Open):
# Accumulate all the currencies declared in the account opening.
if is_covered(entry.account) and entry.currencies:
for currency in entry.currencies:
currencies[entry.account].add(currency)
elif isinstance(entry, data.Balance):
# Ignore balances where directives are present.
if is_covered(entry.account):
balances[entry.account].add((entry.date, entry.amount.currency))
if isinstance(entry, data.Close):
if is_covered(entry.account):
for currency in currencies[entry.account]:
# Skip balance insertion due to the presence of an explicit one.
if (entry.date, currency) in balances[entry.account]:
continue
# Insert a balance directive.
balance_entry = data.Balance(
# Note: We use the close directive's meta so that
# balance errors direct the user to the corresponding
# close directive.
entry.meta,
entry.date + ONE_DAY,
entry.account,
amount.Amount(ZERO, currency),
None,
None,
)
new_entries.append(balance_entry)
new_entries.append(entry)
return new_entries, []
beancount.plugins.close_tree
This plugin inserts close directives for all of an account's descendants when an account is closed. Unopened parent accounts can also be closed. Any explicitly specified close is left untouched.
For example, given this::
2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage ; this does not necessarily need to be opened
the plugin turns it into::
2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage:AAPL
2018-11-10 close Assets:Brokerage:ORNG
Invoke this plugin after any plugins that generate open
directives for account trees
that you want to auto close. An example is the auto_accounts
plugin that ships with
Beancount::
plugin "beancount.plugins.auto_accounts"
plugin "beancount.plugins.close_tree"
beancount.plugins.close_tree.close_tree(entries, unused_options_map)
Insert close entries for all subaccounts of a closed account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/close_tree.py
def close_tree(entries, unused_options_map):
"""Insert close entries for all subaccounts of a closed account.
Args:
entries: A list of directives. We're interested only in the Open/Close instances.
unused_options_map: A parser options dict.
Returns:
A tuple of entries and errors.
"""
new_entries = []
errors = []
opens = set(entry.account for entry in entries if isinstance(entry, Open))
closes = set(entry.account for entry in entries if isinstance(entry, Close))
for entry in entries:
if isinstance(entry, Close):
subaccounts = [
account
for account in opens
if account.startswith(entry.account + ":") and account not in closes
]
for subacc in subaccounts:
meta = data.new_metadata("<beancount.plugins.close_tree>", 0)
close_entry = data.Close(meta, entry.date, subacc)
new_entries.append(close_entry)
# So we don't attempt to re-close a grandchild that a child closed
closes.add(subacc)
if entry.account in opens:
new_entries.append(entry)
else:
new_entries.append(entry)
return new_entries, errors
beancount.plugins.coherent_cost
This plugin validates that currencies held at cost aren't ever converted at price and vice-versa. This is usually the case, and using it will prevent users from making the mistake of selling a lot without specifying it via its cost basis.
beancount.plugins.coherent_cost.CoherentCostError (tuple)
CoherentCostError(source, message, entry)
beancount.plugins.coherent_cost.CoherentCostError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/coherent_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.coherent_cost.CoherentCostError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CoherentCostError(source, message, entry)
beancount.plugins.coherent_cost.CoherentCostError.__replace__(/, self, **kwds)
special
Return a new CoherentCostError object replacing specified fields with new values
Source code in beancount/plugins/coherent_cost.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.coherent_cost.CoherentCostError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/coherent_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.coherent_cost.validate_coherent_cost(entries, unused_options_map)
Check that all currencies are either used at cost or not at all, but never both.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/coherent_cost.py
def validate_coherent_cost(entries, unused_options_map):
"""Check that all currencies are either used at cost or not at all, but never both.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
with_cost = {}
without_cost = {}
for entry in data.filter_txns(entries):
for posting in entry.postings:
target_set = without_cost if posting.cost is None else with_cost
currency = posting.units.currency
target_set.setdefault(currency, entry)
for currency in set(with_cost) & set(without_cost):
errors.append(
CoherentCostError(
without_cost[currency].meta,
"Currency '{}' is used both with and without cost".format(currency),
with_cost[currency]))
# Note: We really ought to include both of the first transactions here.
return entries, errors
beancount.plugins.commodity_attr
A plugin that asserts that all Commodity directives have a particular attribute and that it is part of a set of enum values.
The configuration must be a mapping of attribute name to list of valid values, like this:
plugin "beancount.plugins.commodity_attr" "{
'sector': ['Technology', 'Financials', 'Energy'],
'name': None,
}"
The plugin issues an error if a Commodity directive is missing the attribute, or if the attribute value is not in the valid set. If you'd like to just ensure the attribute is set, set the list of valid values to None, as in the 'name' attribute in the example above.
beancount.plugins.commodity_attr.CommodityError (tuple)
CommodityError(source, message, entry)
beancount.plugins.commodity_attr.CommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.CommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of CommodityError(source, message, entry)
beancount.plugins.commodity_attr.CommodityError.__replace__(/, self, **kwds)
special
Return a new CommodityError object replacing specified fields with new values
Source code in beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.commodity_attr.CommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.ConfigError (tuple)
ConfigError(source, message, entry)
beancount.plugins.commodity_attr.ConfigError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.ConfigError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ConfigError(source, message, entry)
beancount.plugins.commodity_attr.ConfigError.__replace__(/, self, **kwds)
special
Return a new ConfigError object replacing specified fields with new values
Source code in beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.commodity_attr.ConfigError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.validate_commodity_attr(entries, unused_options_map, config_str)
Check that all Commodity directives have a valid attribute.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/commodity_attr.py
def validate_commodity_attr(entries, unused_options_map, config_str):
"""Check that all Commodity directives have a valid attribute.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: A configuration string.
Returns:
A list of new errors, if any were found.
"""
errors = []
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
errors.append(ConfigError(
data.new_metadata('<commodity_attr>', 0),
"Invalid configuration for commodity_attr plugin; skipping.", None))
return entries, errors
validmap = {attr: frozenset(values) if values is not None else None
for attr, values in config_obj.items()}
for entry in entries:
if not isinstance(entry, data.Commodity):
continue
for attr, values in validmap.items():
value = entry.meta.get(attr, None)
if value is None:
errors.append(CommodityError(
entry.meta,
"Missing attribute '{}' for Commodity directive {}".format(
attr, entry.currency), None))
continue
if values and value not in values:
errors.append(CommodityError(
entry.meta,
"Invalid value '{}' for attribute {}, Commodity".format(value, attr) +
" directive {}; valid options: {}".format(
entry.currency, ', '.join(values)), None))
return entries, errors
beancount.plugins.currency_accounts
An implementation of currency accounts.
This is an automatic implementation of the method described here: https://www.mathstat.dal.ca/~selinger/accounting/tutorial.html
You enable it just like this:
plugin "beancount.plugins.currency_accounts" "Equity:CurrencyAccounts"
Accounts will be automatically created under the given base account, with the currency name appended to it, e.g.,
Equity:CurrencyAccounts:CAD
Equity:CurrencyAccounts:USD
etc., where used. You can have a look at the account balances with a query like this:
bean-query $L "select account, sum(position), convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts' "
The sum total of the converted amounts should be a number not too large:
bean-query $L "select convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts'"
WARNING: This is a prototype. Note the FIXMEs in the code below, which indicate some potential problems.
beancount.plugins.currency_accounts.get_neutralizing_postings(curmap, base_account, new_accounts)
Process an entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/currency_accounts.py
def get_neutralizing_postings(curmap, base_account, new_accounts):
"""Process an entry.
Args:
curmap: A dict of currency to a list of Postings of this transaction.
base_account: A string, the root account name to insert.
new_accounts: A set, a mutable accumulator of new account names.
Returns:
A modified entry, with new postings inserted to rebalance currency trading
accounts.
"""
new_postings = []
for currency, postings in curmap.items():
# Compute the per-currency balance.
inv = inventory.Inventory()
for posting in postings:
inv.add_amount(convert.get_cost(posting))
if inv.is_empty():
new_postings.extend(postings)
continue
# Re-insert original postings and remove price conversions.
#
# Note: This may cause problems if the implicit_prices plugin is
# configured to run after this one, or if you need the price annotations
# for some scripting or serious work.
#
# FIXME: We need to handle these important cases (they're not frivolous,
# this is a prototype), probably by inserting some exceptions with
# collaborating code in the booking (e.g. insert some metadata that
# disables price conversions on those postings).
#
# FIXME(2): Ouch! Some of the residual seeps through here, where there
# are more than a single currency block. This needs fixing too. You can
# easily mitigate some of this to some extent, by excluding transactions
# which don't have any price conversion in them.
for pos in postings:
if pos.price is not None:
pos = pos._replace(price=None)
new_postings.append(pos)
# Insert the currency trading accounts postings.
amount = inv.get_only_position().units
acc = account.join(base_account, currency)
new_accounts.add(acc)
new_postings.append(
Posting(acc, -amount, None, None, None, None))
return new_postings
beancount.plugins.currency_accounts.group_postings_by_weight_currency(entry)
Return where this entry might require adjustment.
Source code in beancount/plugins/currency_accounts.py
def group_postings_by_weight_currency(entry: Transaction):
"""Return where this entry might require adjustment."""
curmap = collections.defaultdict(list)
has_price = False
for posting in entry.postings:
currency = posting.units.currency
if posting.cost:
currency = posting.cost.currency
if posting.price:
assert posting.price.currency == currency
elif posting.price:
has_price = True
currency = posting.price.currency
if posting.price:
has_price = True
curmap[currency].append(posting)
return curmap, has_price
beancount.plugins.currency_accounts.insert_currency_trading_postings(entries, options_map, config)
Insert currency trading postings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/currency_accounts.py
def insert_currency_trading_postings(entries, options_map, config):
"""Insert currency trading postings.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The base account name for currency trading accounts.
Returns:
A list of new errors, if any were found.
"""
base_account = config.strip()
if not account.is_valid(base_account):
base_account = DEFAULT_BASE_ACCOUNT
errors = []
new_entries = []
new_accounts = set()
for entry in entries:
if isinstance(entry, Transaction):
curmap, has_price = group_postings_by_weight_currency(entry)
if has_price and len(curmap) > 1:
new_postings = get_neutralizing_postings(
curmap, base_account, new_accounts)
entry = entry._replace(postings=new_postings)
if META_PROCESSED:
entry.meta[META_PROCESSED] = True
new_entries.append(entry)
earliest_date = entries[0].date
open_entries = [
data.Open(data.new_metadata('<currency_accounts>', index),
earliest_date, acc, None, None)
for index, acc in enumerate(sorted(new_accounts))]
return open_entries + new_entries, errors
beancount.plugins.implicit_prices
This plugin synthesizes Price directives for all Postings with a price or directive or if it is an augmenting posting, has a cost directive.
beancount.plugins.implicit_prices.ImplicitPriceError (tuple)
ImplicitPriceError(source, message, entry)
beancount.plugins.implicit_prices.ImplicitPriceError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/implicit_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.implicit_prices.ImplicitPriceError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ImplicitPriceError(source, message, entry)
beancount.plugins.implicit_prices.ImplicitPriceError.__replace__(/, self, **kwds)
special
Return a new ImplicitPriceError object replacing specified fields with new values
Source code in beancount/plugins/implicit_prices.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.implicit_prices.ImplicitPriceError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/implicit_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.implicit_prices.add_implicit_prices(entries, unused_options_map)
Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from postings with costs or with prices from Transaction entries are synthesized as new Price entries in the list of entries output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/implicit_prices.py
def add_implicit_prices(entries, unused_options_map):
"""Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from
postings with costs or with prices from Transaction entries are synthesized
as new Price entries in the list of entries output.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Price entries than before, and a
list of errors.
"""
new_entries = []
errors = []
# A dict of (date, currency, cost-currency) to price entry.
new_price_entry_map = {}
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
# Always replicate the existing entries.
new_entries.append(entry)
if isinstance(entry, Transaction):
# Inspect all the postings in the transaction.
for posting in entry.postings:
units = posting.units
cost = posting.cost
# Check if the position is matching against an existing
# position.
_, booking = balances[posting.account].add_position(posting)
# Add prices when they're explicitly specified on a posting. An
# explicitly specified price may occur in a conversion, e.g.
# Assets:Account 100 USD @ 1.10 CAD
# or, if a cost is also specified, as the current price of the
# underlying instrument, e.g.
# Assets:Account 100 HOOL {564.20} @ {581.97} USD
if posting.price is not None:
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
meta[METADATA_FIELD] = "from_price"
price_entry = data.Price(meta, entry.date,
units.currency,
posting.price)
# Add costs, when we're not matching against an existing
# position. This happens when we're just specifying the cost,
# e.g.
# Assets:Account 100 HOOL {564.20}
elif (cost is not None and
booking != inventory.MatchResult.REDUCED):
# TODO(blais): What happens here if the account has no
# booking strategy? Do we end up inserting a price for the
# reducing leg? Check.
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
meta[METADATA_FIELD] = "from_cost"
price_entry = data.Price(meta, entry.date,
units.currency,
amount.Amount(cost.number, cost.currency))
else:
price_entry = None
if price_entry is not None:
key = (price_entry.date,
price_entry.currency,
price_entry.amount.number, # Ideally should be removed.
price_entry.amount.currency)
try:
new_price_entry_map[key]
## Do not fail for now. We still have many valid use
## cases of duplicate prices on the same date, for
## example, stock splits, or trades on two dates with
## two separate reported prices. We need to figure out a
## more elegant solution for this in the long term.
## Keeping both for now. We should ideally not use the
## number in the de-dup key above.
#
# dup_entry = new_price_entry_map[key]
# if price_entry.amount.number == dup_entry.amount.number:
# # Skip duplicates.
# continue
# else:
# errors.append(
# ImplicitPriceError(
# entry.meta,
# "Duplicate prices for {} on {}".format(entry,
# dup_entry),
# entry))
except KeyError:
new_price_entry_map[key] = price_entry
new_entries.append(price_entry)
return new_entries, errors
beancount.plugins.leafonly
A plugin that issues errors when amounts are posted to non-leaf accounts, that is, accounts with child accounts.
This is an extra constraint that you may want to apply optionally. If you install this plugin, it will issue errors for all accounts that have postings to non-leaf accounts. Some users may want to disallow this and enforce that only leaf accounts may have postings on them.
beancount.plugins.leafonly.LeafOnlyError (tuple)
LeafOnlyError(source, message, entry)
beancount.plugins.leafonly.LeafOnlyError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/leafonly.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.leafonly.LeafOnlyError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of LeafOnlyError(source, message, entry)
beancount.plugins.leafonly.LeafOnlyError.__replace__(/, self, **kwds)
special
Return a new LeafOnlyError object replacing specified fields with new values
Source code in beancount/plugins/leafonly.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.leafonly.LeafOnlyError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/leafonly.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.leafonly.validate_leaf_only(entries, unused_options_map)
Check for non-leaf accounts that have postings on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/leafonly.py
def validate_leaf_only(entries, unused_options_map):
"""Check for non-leaf accounts that have postings on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
real_root = realization.realize(entries, compute_balance=False)
default_meta = data.new_metadata('<leafonly>', 0)
open_close_map = None # Lazily computed.
errors = []
for real_account in realization.iter_children(real_root):
if len(real_account) > 0 and real_account.txn_postings:
if open_close_map is None:
open_close_map = getters.get_account_open_close(entries)
try:
open_entry = open_close_map[real_account.account][0]
except KeyError:
open_entry = None
errors.append(LeafOnlyError(
open_entry.meta if open_entry else default_meta,
"Non-leaf account '{}' has postings on it".format(real_account.account),
open_entry))
return entries, errors
beancount.plugins.noduplicates
This plugin validates that there are no duplicate transactions.
beancount.plugins.noduplicates.validate_no_duplicates(entries, unused_options_map)
Check that the entries are unique, by computing hashes.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/noduplicates.py
def validate_no_duplicates(entries, unused_options_map):
"""Check that the entries are unique, by computing hashes.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
unused_hashes, errors = compare.hash_entries(entries, exclude_meta=True)
return entries, errors
beancount.plugins.nounused
This plugin validates that there are no unused accounts.
beancount.plugins.nounused.UnusedAccountError (tuple)
UnusedAccountError(source, message, entry)
beancount.plugins.nounused.UnusedAccountError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/nounused.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.nounused.UnusedAccountError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of UnusedAccountError(source, message, entry)
beancount.plugins.nounused.UnusedAccountError.__replace__(/, self, **kwds)
special
Return a new UnusedAccountError object replacing specified fields with new values
Source code in beancount/plugins/nounused.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.nounused.UnusedAccountError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/nounused.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.nounused.validate_unused_accounts(entries, unused_options_map)
Check that all accounts declared open are actually used.
We check that all of the accounts that are open are at least referred to by another directive. These are probably unused, so issue a warning (we like to be pedantic). Note that an account that is open and then closed is considered used--this is a valid use case that may occur in reality. If you have a use case for an account to be open but never used, you can quiet that warning by initializing the account with a balance asserts or a pad directive, or even use a note will be sufficient.
(This is probably a good candidate for optional inclusion as a "pedantic" plugin.)
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/nounused.py
def validate_unused_accounts(entries, unused_options_map):
"""Check that all accounts declared open are actually used.
We check that all of the accounts that are open are at least referred to by
another directive. These are probably unused, so issue a warning (we like to
be pedantic). Note that an account that is open and then closed is
considered used--this is a valid use case that may occur in reality. If you
have a use case for an account to be open but never used, you can quiet that
warning by initializing the account with a balance asserts or a pad
directive, or even use a note will be sufficient.
(This is probably a good candidate for optional inclusion as a "pedantic"
plugin.)
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Find all the accounts referenced by entries which are not Open, and the
# open directives for error reporting below.
open_map = {}
referenced_accounts = set()
for entry in entries:
if isinstance(entry, data.Open):
open_map[entry.account] = entry
continue
referenced_accounts.update(getters.get_entry_accounts(entry))
# Create a list of suitable errors, with the location of the Open directives
# corresponding to the unused accounts.
errors = [UnusedAccountError(open_entry.meta,
"Unused account '{}'".format(account),
open_entry)
for account, open_entry in open_map.items()
if account not in referenced_accounts]
return entries, errors
beancount.plugins.onecommodity
A plugin that issues errors when more than one commodity is used in an account.
For investments or trading accounts, it can make it easier to filter the action around a single stock by using the name of the stock as the leaf of the account name.
Notes:
-
The plugin will automatically skip accounts that have explicitly declared commodities in their Open directive.
-
You can also set the metadata "onecommodity: FALSE" on an account's Open directive to skip the checks for that account.
-
If provided, the configuration should be a regular expression restricting the set of accounts to check.
beancount.plugins.onecommodity.OneCommodityError (tuple)
OneCommodityError(source, message, entry)
beancount.plugins.onecommodity.OneCommodityError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/onecommodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.onecommodity.OneCommodityError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of OneCommodityError(source, message, entry)
beancount.plugins.onecommodity.OneCommodityError.__replace__(/, self, **kwds)
special
Return a new OneCommodityError object replacing specified fields with new values
Source code in beancount/plugins/onecommodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.onecommodity.OneCommodityError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/onecommodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.onecommodity.validate_one_commodity(entries, unused_options_map, config=None)
Check that each account has units in only a single commodity.
This is an extra constraint that you may want to apply optionally, despite Beancount's ability to support inventories and aggregations with more than one commodity. I believe this also matches GnuCash's model, where each account has a single commodity attached to it.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/onecommodity.py
def validate_one_commodity(entries, unused_options_map, config=None):
"""Check that each account has units in only a single commodity.
This is an extra constraint that you may want to apply optionally, despite
Beancount's ability to support inventories and aggregations with more than
one commodity. I believe this also matches GnuCash's model, where each
account has a single commodity attached to it.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The plugin configuration string, a regular expression to match
against the subset of accounts to check.
Returns:
A list of new errors, if any were found.
"""
accounts_re = re.compile(config) if config else None
# Mappings of account name to lists of currencies for each units and cost.
units_map = collections.defaultdict(set)
cost_map = collections.defaultdict(set)
# Mappings to use just for getting a relevant source.
units_source_map = {}
cost_source_map = {}
# Gather the set of accounts to skip from the Open directives.
skip_accounts = set()
for entry in entries:
if not isinstance(entry, data.Open):
continue
if (not entry.meta.get("onecommodity", True) or
(accounts_re and not accounts_re.match(entry.account)) or
(entry.currencies and len(entry.currencies) > 1)):
skip_accounts.add(entry.account)
# Accumulate all the commodities used.
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
if posting.account in skip_accounts:
continue
units = posting.units
units_map[posting.account].add(units.currency)
if len(units_map[posting.account]) > 1:
units_source_map[posting.account] = entry
cost = posting.cost
if cost:
cost_map[posting.account].add(cost.currency)
if len(cost_map[posting.account]) > 1:
cost_source_map[posting.account] = entry
elif isinstance(entry, data.Balance):
if entry.account in skip_accounts:
continue
units_map[entry.account].add(entry.amount.currency)
if len(units_map[entry.account]) > 1:
units_source_map[entry.account] = entry
elif isinstance(entry, data.Open):
if entry.currencies and len(entry.currencies) > 1:
skip_accounts.add(entry.account)
# Check units.
errors = []
for account, currencies in units_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
units_source_map[account].meta,
"More than one currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
# Check costs.
for account, currencies in cost_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
cost_source_map[account].meta,
"More than one cost currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
return entries, errors
beancount.plugins.pedantic
A plugin of plugins which triggers all the pedantic plugins.
beancount.plugins.sellgains
A plugin that cross-checks declared gains against prices on lot sales.
When you sell stock, the gains can be automatically implied by the corresponding cash amounts. For example, in the following transaction the 2nd and 3rd postings should match the value of the stock sold:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL -10.125 USD
The cost basis is checked against: 2141.36 + 008 + -10.125. That is, the balance checks computes
-81 x 26.3125 = -2131.3125 + 2141.36 + 0.08 + -10.125
and checks that the residual is below a small tolerance.
But... usually the income leg isn't given to you in statements. Beancount can automatically infer it using the balance, which is convenient, like this:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
Additionally, most often you have the sales prices given to you on your transaction confirmation statement, so you can enter this:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD} @ 26.4375 USD
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
So in theory, if the price is given (26.4375 USD), we could verify that the proceeds from the sale at the given price match non-Income postings. That is, verify that
-81 x 26.4375 = -2141.4375 + 2141.36 + 0.08 +
is below a small tolerance value. So this plugin does this.
In general terms, it does the following: For transactions with postings that have a cost and a price, it verifies that the sum of the positions on all postings to non-income accounts is below tolerance.
This provides yet another level of verification and allows you to elide the income amounts, knowing that the price is there to provide an extra level of error-checking in case you enter a typo.
beancount.plugins.sellgains.SellGainsError (tuple)
SellGainsError(source, message, entry)
beancount.plugins.sellgains.SellGainsError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/sellgains.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.sellgains.SellGainsError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of SellGainsError(source, message, entry)
beancount.plugins.sellgains.SellGainsError.__replace__(/, self, **kwds)
special
Return a new SellGainsError object replacing specified fields with new values
Source code in beancount/plugins/sellgains.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.sellgains.SellGainsError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/sellgains.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.sellgains.validate_sell_gains(entries, options_map)
Check the sum of asset account totals for lots sold with a price on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/sellgains.py
def validate_sell_gains(entries, options_map):
"""Check the sum of asset account totals for lots sold with a price on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
acc_types = options.get_account_types(options_map)
proceed_types = set([acc_types.assets,
acc_types.liabilities,
acc_types.equity,
acc_types.expenses])
for entry in entries:
if not isinstance(entry, data.Transaction):
continue
# Find transactions whose lots at cost all have a price.
postings_at_cost = [posting
for posting in entry.postings
if posting.cost is not None]
if not postings_at_cost or not all(posting.price is not None
for posting in postings_at_cost):
continue
# Accumulate the total expected proceeds and the sum of the asset and
# expenses legs.
total_price = inventory.Inventory()
total_proceeds = inventory.Inventory()
for posting in entry.postings:
# If the posting is held at cost, add the priced value to the balance.
if posting.cost is not None:
assert posting.price is not None
price = posting.price
total_price.add_amount(amount.mul(price, -posting.units.number))
else:
# Otherwise, use the weight and ignore postings to Income accounts.
atype = account_types.get_account_type(posting.account)
if atype in proceed_types:
total_proceeds.add_amount(convert.get_weight(posting))
# Compare inventories, currency by currency.
dict_price = {pos.units.currency: pos.units.number
for pos in total_price}
dict_proceeds = {pos.units.currency: pos.units.number
for pos in total_proceeds}
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
invalid = False
for currency, price_number in dict_price.items():
# Accept a looser than usual tolerance because rounding occurs
# differently. Also, it would be difficult for the user to satisfy
# two sets of constraints manually.
tolerance = tolerances.get(currency) * EXTRA_TOLERANCE_MULTIPLIER
proceeds_number = dict_proceeds.pop(currency, ZERO)
diff = abs(price_number - proceeds_number)
if diff > tolerance:
invalid = True
break
if invalid or dict_proceeds:
errors.append(
SellGainsError(
entry.meta,
"Invalid price vs. proceeds/gains: {} vs. {}; difference: {}".format(
total_price, total_proceeds, (total_price + -total_proceeds)),
entry))
return entries, errors
beancount.plugins.unique_prices
This module adds validation that there is a single price defined per date and base/quote currencies. If multiple conflicting price values are declared, an error is generated. Note that multiple price entries with the same number do not generate an error.
This is meant to be turned on if you want to use a very strict mode for entering prices, and may not be realistic usage. For example, if you have (1) a transaction with an implicitly generated price during the day (from its cost) and (2) a separate explicit price directive that declares a different price for the day's closing price, this would generate an error. I'm not certain this will be useful in the long run, so placing it in a plugin.
beancount.plugins.unique_prices.UniquePricesError (tuple)
UniquePricesError(source, message, entry)
beancount.plugins.unique_prices.UniquePricesError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/plugins/unique_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.unique_prices.UniquePricesError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of UniquePricesError(source, message, entry)
beancount.plugins.unique_prices.UniquePricesError.__replace__(/, self, **kwds)
special
Return a new UniquePricesError object replacing specified fields with new values
Source code in beancount/plugins/unique_prices.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.unique_prices.UniquePricesError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/plugins/unique_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.unique_prices.validate_unique_prices(entries, unused_options_map)
Check that there is only a single price per day for a particular base/quote.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/plugins/unique_prices.py
def validate_unique_prices(entries, unused_options_map):
"""Check that there is only a single price per day for a particular base/quote.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
The list of input entries, and a list of new UniquePricesError instances generated.
"""
new_entries = []
errors = []
prices = collections.defaultdict(list)
for entry in entries:
if not isinstance(entry, data.Price):
continue
key = (entry.date, entry.currency, entry.amount.currency)
prices[key].append(entry)
errors = []
for price_entries in prices.values():
if len(price_entries) > 1:
number_map = {price_entry.amount.number: price_entry
for price_entry in price_entries}
if len(number_map) > 1:
# Note: This should be a list of entries for better error
# reporting. (Later.)
error_entry = next(iter(number_map.values()))
errors.append(
UniquePricesError(error_entry.meta,
"Disagreeing price entries",
price_entries))
return entries, errors