beancount.ops
Operations on the entries defined in the core modules.
This package contains various functions which operate on lists of entries.
beancount.ops.balance
Automatic padding of gaps between entries.
beancount.ops.balance.BalanceError (tuple)
BalanceError(source, message, entry)
beancount.ops.balance.BalanceError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/balance.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.balance.BalanceError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of BalanceError(source, message, entry)
beancount.ops.balance.BalanceError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/balance.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.balance.check(entries, options_map)
Process the balance assertion directives.
For each Balance directive, check that their expected balance corresponds to the actual balance computed at that time and replace failing ones by new ones with a flag that indicates failure.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/balance.py
def check(entries, options_map):
"""Process the balance assertion directives.
For each Balance directive, check that their expected balance corresponds to
the actual balance computed at that time and replace failing ones by new
ones with a flag that indicates failure.
Args:
entries: A list of directives.
options_map: A dict of options, parsed from the input file.
Returns:
A pair of a list of directives and a list of balance check errors.
"""
new_entries = []
check_errors = []
# This is similar to realization, but performed in a different order, and
# where we only accumulate inventories for accounts that have balance
# assertions in them (this saves on time). Here we process the entries one
# by one along with the balance checks. We use a temporary realization in
# order to hold the incremental tree of balances, so that we can easily get
# the amounts of an account's subaccounts for making checks on parent
# accounts.
real_root = realization.RealAccount('')
# Figure out the set of accounts for which we need to compute a running
# inventory balance.
asserted_accounts = {entry.account
for entry in entries
if isinstance(entry, Balance)}
# Add all children accounts of an asserted account to be calculated as well,
# and pre-create these accounts, and only those (we're just being tight to
# make sure).
asserted_match_list = [account.parent_matcher(account_)
for account_ in asserted_accounts]
for account_ in getters.get_accounts(entries):
if (account_ in asserted_accounts or
any(match(account_) for match in asserted_match_list)):
realization.get_or_create(real_root, account_)
# Get the Open directives for each account.
open_close_map = getters.get_account_open_close(entries)
for entry in entries:
if isinstance(entry, Transaction):
# For each of the postings' accounts, update the balance inventory.
for posting in entry.postings:
real_account = realization.get(real_root, posting.account)
# The account will have been created only if we're meant to track it.
if real_account is not None:
# Note: Always allow negative lots for the purpose of balancing.
# This error should show up somewhere else than here.
real_account.balance.add_position(posting)
elif isinstance(entry, Balance):
# Check that the currency of the balance check is one of the allowed
# currencies for that account.
expected_amount = entry.amount
try:
open, _ = open_close_map[entry.account]
except KeyError:
check_errors.append(
BalanceError(entry.meta,
"Account '{}' does not exist: ".format(entry.account),
entry))
continue
if (expected_amount is not None and
open and open.currencies and
expected_amount.currency not in open.currencies):
check_errors.append(
BalanceError(entry.meta,
"Invalid currency '{}' for Balance directive: ".format(
expected_amount.currency),
entry))
# Sum up the current balances for this account and its
# sub-accounts. We want to support checks for parent accounts
# for the total sum of their subaccounts.
#
# FIXME: Improve the performance further by computing the balance
# for the desired currency only. This won't allow us to cache in
# this way but may be faster, if we're not asserting all the
# currencies. Furthermore, we could probably avoid recomputing the
# balance if a subtree of positions hasn't been invalidated by a new
# position added to the realization. Do this.
real_account = realization.get(real_root, entry.account)
assert real_account is not None, "Missing {}".format(entry.account)
subtree_balance = realization.compute_balance(real_account, leaf_only=False)
# Get only the amount in the desired currency.
balance_amount = subtree_balance.get_currency_units(expected_amount.currency)
# Check if the amount is within bounds of the expected amount.
diff_amount = amount.sub(balance_amount, expected_amount)
# Use the specified tolerance or automatically infer it.
tolerance = get_balance_tolerance(entry, options_map)
if abs(diff_amount.number) > tolerance:
check_errors.append(
BalanceError(entry.meta,
("Balance failed for '{}': "
"expected {} != accumulated {} ({} {})").format(
entry.account, expected_amount, balance_amount,
abs(diff_amount.number),
('too much'
if diff_amount.number > 0
else 'too little')),
entry))
# Substitute the entry by a failing entry, with the diff_amount
# field set on it. I'm not entirely sure that this is the best
# of ideas, maybe leaving the original check intact and insert a
# new error entry might be more functional or easier to
# understand.
entry = entry._replace(
meta=entry.meta.copy(),
diff_amount=diff_amount)
new_entries.append(entry)
return new_entries, check_errors
beancount.ops.balance.get_balance_tolerance(balance_entry, options_map)
Get the tolerance amount for a single entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/balance.py
def get_balance_tolerance(balance_entry, options_map):
"""Get the tolerance amount for a single entry.
Args:
balance_entry: An instance of data.Balance
options_map: An options dict, as per the parser.
Returns:
A Decimal, the amount of tolerance implied by the directive.
"""
if balance_entry.tolerance is not None:
# Use the balance-specific tolerance override if it is provided.
tolerance = balance_entry.tolerance
else:
expo = balance_entry.amount.number.as_tuple().exponent
if expo < 0:
# Be generous and always allow twice the multiplier on Balance and
# Pad because the user creates these and the rounding of those
# balances may often be further off than those used within a single
# transaction.
tolerance = options_map["inferred_tolerance_multiplier"] * 2
tolerance = ONE.scaleb(expo) * tolerance
else:
tolerance = ZERO
return tolerance
beancount.ops.basicops
Basic filtering and aggregation operations on lists of entries.
This module contains some common basic operations on entries that are complex enough not to belong in core/data.py.
beancount.ops.basicops.filter_link(link, entries)
Yield all the entries which have the given link.
Parameters: |
|
---|
Yields: Every entry in 'entries' that links to 'link.
Source code in beancount/ops/basicops.py
def filter_link(link, entries):
"""Yield all the entries which have the given link.
Args:
link: A string, the link we are interested in.
Yields:
Every entry in 'entries' that links to 'link.
"""
for entry in entries:
# pylint: disable=bad-continuation
if (isinstance(entry, data.Transaction) and
entry.links and link in entry.links):
yield entry
beancount.ops.basicops.filter_tag(tag, entries)
Yield all the entries which have the given tag.
Parameters: |
|
---|
Yields: Every entry in 'entries' that tags to 'tag.
Source code in beancount/ops/basicops.py
def filter_tag(tag, entries):
"""Yield all the entries which have the given tag.
Args:
tag: A string, the tag we are interested in.
Yields:
Every entry in 'entries' that tags to 'tag.
"""
for entry in entries:
# pylint: disable=bad-continuation
if (isinstance(entry, data.Transaction) and
entry.tags and
tag in entry.tags):
yield entry
beancount.ops.basicops.get_common_accounts(entries)
Compute the intersection of the accounts on the given entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/basicops.py
def get_common_accounts(entries):
"""Compute the intersection of the accounts on the given entries.
Args:
entries: A list of Transaction entries to process.
Returns:
A set of strings, the names of the common accounts from these
entries.
"""
assert all(isinstance(entry, data.Transaction) for entry in entries)
# If there is a single entry, the common accounts to it is all its accounts.
# Note that this also works with no entries (yields an empty set).
if len(entries) < 2:
if entries:
intersection = {posting.account for posting in entries[0].postings}
else:
intersection = set()
else:
entries_iter = iter(entries)
intersection = set(posting.account for posting in next(entries_iter).postings)
for entry in entries_iter:
accounts = set(posting.account for posting in entry.postings)
intersection &= accounts
if not intersection:
break
return intersection
beancount.ops.basicops.group_entries_by_link(entries)
Group the list of entries by link.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/basicops.py
def group_entries_by_link(entries):
"""Group the list of entries by link.
Args:
entries: A list of directives/transactions to process.
Returns:
A dict of link-name to list of entries.
"""
link_groups = defaultdict(list)
for entry in entries:
if not (isinstance(entry, data.Transaction) and entry.links):
continue
for link in entry.links:
link_groups[link].append(entry)
return link_groups
beancount.ops.compress
Compress multiple entries into a single one.
This can be used during import to compress the effective output, for accounts with a large number of similar entries. For example, I had a trading account which would pay out interest every single day. I have no desire to import the full detail of these daily interests, and compressing these interest-only entries to monthly ones made sense. This is the code that was used to carry this out.
beancount.ops.compress.compress(entries, predicate)
Compress multiple transactions into single transactions.
Replace consecutive sequences of Transaction entries that fulfill the given predicate by a single entry at the date of the last matching entry. 'predicate' is the function that determines if an entry should be compressed.
This can be used to simply a list of transactions that are similar and occur frequently. As an example, in a retail FOREX trading account, differential interest of very small amounts is paid every day; it is not relevant to look at the full detail of this interest unless there are other transactions. You can use this to compress it into single entries between other types of transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/compress.py
def compress(entries, predicate):
"""Compress multiple transactions into single transactions.
Replace consecutive sequences of Transaction entries that fulfill the given
predicate by a single entry at the date of the last matching entry.
'predicate' is the function that determines if an entry should be
compressed.
This can be used to simply a list of transactions that are similar and occur
frequently. As an example, in a retail FOREX trading account, differential
interest of very small amounts is paid every day; it is not relevant to look
at the full detail of this interest unless there are other transactions. You
can use this to compress it into single entries between other types of
transactions.
Args:
entries: A list of directives.
predicate: A callable which accepts an entry and return true if the entry
is intended to be compressed.
Returns:
A list of directives, with compressible transactions replaced by a summary
equivalent.
"""
new_entries = []
pending = []
for entry in entries:
if isinstance(entry, data.Transaction) and predicate(entry):
# Save for compressing later.
pending.append(entry)
else:
# Compress and output all the pending entries.
if pending:
new_entries.append(merge(pending, pending[-1]))
pending.clear()
# Output the differing entry.
new_entries.append(entry)
if pending:
new_entries.append(merge(pending, pending[-1]))
return new_entries
beancount.ops.compress.merge(entries, prototype_txn)
Merge the postings of a list of Transactions into a single one.
Merge postings the given entries into a single entry with the Transaction attributes of the prototype. Return the new entry. The combined list of postings are merged if everything about the postings is the same except the number.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/compress.py
def merge(entries, prototype_txn):
"""Merge the postings of a list of Transactions into a single one.
Merge postings the given entries into a single entry with the Transaction
attributes of the prototype. Return the new entry. The combined list of
postings are merged if everything about the postings is the same except the
number.
Args:
entries: A list of directives.
prototype_txn: A Transaction which is used to create the compressed
Transaction instance. Its list of postings is ignored.
Returns:
A new Transaction instance which contains all the postings from the input
entries merged together.
"""
# Aggregate the postings together. This is a mapping of numberless postings
# to their number of units.
postings_map = collections.defaultdict(Decimal)
for entry in data.filter_txns(entries):
for posting in entry.postings:
# We strip the number off the posting to act as an aggregation key.
key = data.Posting(posting.account,
Amount(None, posting.units.currency),
posting.cost,
posting.price,
posting.flag,
None)
postings_map[key] += posting.units.number
# Create a new transaction with the aggregated postings.
new_entry = data.Transaction(prototype_txn.meta,
prototype_txn.date,
prototype_txn.flag,
prototype_txn.payee,
prototype_txn.narration,
data.EMPTY_SET, data.EMPTY_SET, [])
# Sort for at least some stability of output.
sorted_items = sorted(postings_map.items(),
key=lambda item: (item[0].account,
item[0].units.currency,
item[1]))
# Issue the merged postings.
for posting, number in sorted_items:
units = Amount(number, posting.units.currency)
new_entry.postings.append(
data.Posting(posting.account, units, posting.cost, posting.price,
posting.flag, posting.meta))
return new_entry
beancount.ops.documents
Everything that relates to creating the Document directives.
beancount.ops.documents.DocumentError (tuple)
DocumentError(source, message, entry)
beancount.ops.documents.DocumentError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/documents.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.documents.DocumentError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of DocumentError(source, message, entry)
beancount.ops.documents.DocumentError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/documents.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.documents.find_documents(directory, input_filename, accounts_only=None, strict=False)
Find dated document files under the given directory.
If a restricting set of accounts is provided in 'accounts_only', only return entries that correspond to one of the given accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def find_documents(directory, input_filename, accounts_only=None, strict=False):
"""Find dated document files under the given directory.
If a restricting set of accounts is provided in 'accounts_only', only return
entries that correspond to one of the given accounts.
Args:
directory: A string, the name of the root of the directory hierarchy to be searched.
input_filename: The name of the file to be used for the Document directives. This is
also used to resolve relative directory names.
accounts_only: A set of valid accounts strings to search for.
strict: A boolean, set to true if you want to generate errors on documents
found in accounts not provided in accounts_only. This is only meaningful
if accounts_only is specified.
Returns:
A list of new Document objects that were created from the files found, and a list
of new errors generated.
"""
errors = []
# Compute the documents directory name relative to the beancount input
# file itself.
if not path.isabs(directory):
input_directory = path.dirname(input_filename)
directory = path.abspath(path.normpath(path.join(input_directory,
directory)))
# If the directory does not exist, just generate an error and return.
if not path.exists(directory):
meta = data.new_metadata(input_filename, 0)
error = DocumentError(
meta, "Document root '{}' does not exist".format(directory), None)
return ([], [error])
# Walk the hierarchy of files.
entries = []
for root, account_name, dirs, files in account.walk(directory):
# Look for files that have a dated filename.
for filename in files:
match = re.match(r'(\d\d\d\d)-(\d\d)-(\d\d).(.*)', filename)
if not match:
continue
# If a restricting set of accounts was specified, skip document
# directives found in accounts with no corresponding account name.
if accounts_only is not None and not account_name in accounts_only:
if strict:
if any(account_name.startswith(account) for account in accounts_only):
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Document '{}' found in child account {}".format(
filename, account_name), None))
elif any(account.startswith(account_name) for account in accounts_only):
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Document '{}' found in parent account {}".format(
filename, account_name), None))
continue
# Create a new directive.
meta = data.new_metadata(input_filename, 0)
try:
date = datetime.date(*map(int, match.group(1, 2, 3)))
except ValueError as exc:
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Invalid date on document file '{}': {}".format(
filename, exc), None))
else:
entry = data.Document(meta, date, account_name, path.join(root, filename),
data.EMPTY_SET, data.EMPTY_SET)
entries.append(entry)
return (entries, errors)
beancount.ops.documents.process_documents(entries, options_map)
Check files for document directives and create documents directives automatically.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def process_documents(entries, options_map):
"""Check files for document directives and create documents directives automatically.
Args:
entries: A list of all directives parsed from the file.
options_map: An options dict, as is output by the parser.
We're using its 'filename' option to figure out relative path to
search for documents.
Returns:
A pair of list of all entries (including new ones), and errors
generated during the process of creating document directives.
"""
filename = options_map["filename"]
# Detect filenames that should convert into entries.
autodoc_entries = []
autodoc_errors = []
document_dirs = options_map['documents']
if document_dirs:
# Restrict to the list of valid accounts only.
accounts = getters.get_accounts(entries)
# Accumulate all the entries.
for directory in map(path.normpath, document_dirs):
new_entries, new_errors = find_documents(directory, filename, accounts)
autodoc_entries.extend(new_entries)
autodoc_errors.extend(new_errors)
# Merge the two lists of entries and errors. Keep the entries sorted.
entries.extend(autodoc_entries)
entries.sort(key=data.entry_sortkey)
return (entries, autodoc_errors)
beancount.ops.documents.verify_document_files_exist(entries, unused_options_map)
Verify that the document entries point to existing files.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def verify_document_files_exist(entries, unused_options_map):
"""Verify that the document entries point to existing files.
Args:
entries: a list of directives whose documents need to be validated.
unused_options_map: A parser options dict. We're not using it.
Returns:
The same list of entries, and a list of new errors, if any were encountered.
"""
errors = []
for entry in entries:
if not isinstance(entry, data.Document):
continue
if not path.exists(entry.filename):
errors.append(
DocumentError(entry.meta,
'File does not exist: "{}"'.format(entry.filename),
entry))
return entries, errors
beancount.ops.holdings
Compute final holdings for a list of entries.
beancount.ops.holdings.Holding (tuple)
Holding(account, number, currency, cost_number, cost_currency, book_value, market_value, price_number, price_date)
beancount.ops.holdings.Holding.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/holdings.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.holdings.Holding.__new__(_cls, account, number, currency, cost_number, cost_currency, book_value, market_value, price_number, price_date)
special
staticmethod
Create new instance of Holding(account, number, currency, cost_number, cost_currency, book_value, market_value, price_number, price_date)
beancount.ops.holdings.Holding.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/holdings.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.holdings.aggregate_holdings_by(holdings, keyfun)
Aggregate holdings by some key.
Note that the cost-currency must always be included in the group-key (sums over multiple currency units do not make sense), so it is appended to the sort-key automatically.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def aggregate_holdings_by(holdings, keyfun):
"""Aggregate holdings by some key.
Note that the cost-currency must always be included in the group-key (sums
over multiple currency units do not make sense), so it is appended to the
sort-key automatically.
Args:
keyfun: A callable, which returns the key to aggregate by. This key need
not include the cost-currency.
Returns:
A list of aggregated holdings.
"""
# Aggregate the groups of holdings.
grouped = collections.defaultdict(list)
for holding in holdings:
key = (keyfun(holding), holding.cost_currency)
grouped[key].append(holding)
grouped_holdings = (aggregate_holdings_list(key_holdings)
for key_holdings in grouped.values())
# We could potentially filter out holdings with zero units here. These types
# of holdings might occur on a group with leaked (i.e., non-zero) cost basis
# and zero units. However, sometimes are valid merging of multiple
# currencies may occur, and the number value will be legitimately set to
# ZERO (for various reasons downstream), so we prefer not to ignore the
# holding. Callers must be prepared to deal with a holding of ZERO units and
# a non-zero cost basis. {0ed05c502e63, b/16}
## nonzero_holdings = (holding
## for holding in grouped_holdings
## if holding.number != ZERO)
# Return the holdings in order.
return sorted(grouped_holdings,
key=lambda holding: (holding.account, holding.currency))
beancount.ops.holdings.aggregate_holdings_list(holdings)
Aggregate a list of holdings.
If there are varying 'account', 'currency' or 'cost_currency' attributes, their values are replaced by '*'. Otherwise they are preserved. Note that all the cost-currency values must be equal in order for aggregations to succeed (without this constraint a sum of units in different currencies has no meaning).
Parameters: |
|
---|
Returns: |
|
---|
Exceptions: |
|
---|
Source code in beancount/ops/holdings.py
def aggregate_holdings_list(holdings):
"""Aggregate a list of holdings.
If there are varying 'account', 'currency' or 'cost_currency' attributes,
their values are replaced by '*'. Otherwise they are preserved. Note that
all the cost-currency values must be equal in order for aggregations to
succeed (without this constraint a sum of units in different currencies has
no meaning).
Args:
holdings: A list of Holding instances.
Returns:
A single Holding instance, or None, if there are no holdings in the input
list.
Raises:
ValueError: If multiple cost currencies encountered.
"""
if not holdings:
return None
# Note: Holding is a bit overspecified with book and market values. We
# recompute them from cost and price numbers here anyhow.
units, total_book_value, total_market_value = ZERO, ZERO, ZERO
accounts = set()
currencies = set()
cost_currencies = set()
price_dates = set()
book_value_seen = False
market_value_seen = False
for holding in holdings:
units += holding.number
accounts.add(holding.account)
price_dates.add(holding.price_date)
currencies.add(holding.currency)
cost_currencies.add(holding.cost_currency)
if holding.book_value is not None:
total_book_value += holding.book_value
book_value_seen = True
elif holding.cost_number is not None:
total_book_value += holding.number * holding.cost_number
book_value_seen = True
if holding.market_value is not None:
total_market_value += holding.market_value
market_value_seen = True
elif holding.price_number is not None:
total_market_value += holding.number * holding.price_number
market_value_seen = True
if book_value_seen:
average_cost = total_book_value / units if units else None
else:
total_book_value = None
average_cost = None
if market_value_seen:
average_price = total_market_value / units if units else None
else:
total_market_value = None
average_price = None
if len(cost_currencies) != 1:
raise ValueError("Cost currencies are not homogeneous for aggregation: {}".format(
','.join(map(str, cost_currencies))))
units = units if len(currencies) == 1 else ZERO
currency = currencies.pop() if len(currencies) == 1 else '*'
cost_currency = cost_currencies.pop()
account_ = (accounts.pop()
if len(accounts) == 1
else account.commonprefix(accounts))
price_date = price_dates.pop() if len(price_dates) == 1 else None
return Holding(account_, units, currency, average_cost, cost_currency,
total_book_value, total_market_value, average_price, price_date)
beancount.ops.holdings.convert_to_currency(price_map, target_currency, holdings_list)
Convert the given list of holdings's fields to a common currency.
If the rate is not available to convert, leave the fields empty.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def convert_to_currency(price_map, target_currency, holdings_list):
"""Convert the given list of holdings's fields to a common currency.
If the rate is not available to convert, leave the fields empty.
Args:
price_map: A price-map, as built by prices.build_price_map().
target_currency: The target common currency to convert amounts to.
holdings_list: A list of holdings.Holding instances.
Returns:
A modified list of holdings, with the 'extra' field set to the value in
'currency', or None, if it was not possible to convert.
"""
# A list of the fields we should convert.
convert_fields = ('cost_number', 'book_value', 'market_value', 'price_number')
new_holdings = []
for holding in holdings_list:
if holding.cost_currency == target_currency:
# The holding is already priced in the target currency; do nothing.
new_holding = holding
else:
if holding.cost_currency is None:
# There is no cost currency; make the holding priced in its own
# units. The price-map should yield a rate of 1.0 and everything
# else works out.
if holding.currency is None:
raise ValueError("Invalid currency '{}'".format(holding.currency))
holding = holding._replace(cost_currency=holding.currency)
# Fill in with book and market value as well.
if holding.book_value is None:
holding = holding._replace(book_value=holding.number)
if holding.market_value is None:
holding = holding._replace(market_value=holding.number)
assert holding.cost_currency, "Missing cost currency: {}".format(holding)
base_quote = (holding.cost_currency, target_currency)
# Get the conversion rate and replace the required numerical
# fields..
_, rate = prices.get_latest_price(price_map, base_quote)
if rate is not None:
new_holding = misc_utils.map_namedtuple_attributes(
convert_fields,
lambda number, r=rate: number if number is None else number * r,
holding)
# Ensure we set the new cost currency after conversion.
new_holding = new_holding._replace(cost_currency=target_currency)
else:
# Could not get the rate... clear every field and set the cost
# currency to None. This enough marks the holding conversion as
# a failure.
new_holding = misc_utils.map_namedtuple_attributes(
convert_fields, lambda number: None, holding)
new_holding = new_holding._replace(cost_currency=None)
new_holdings.append(new_holding)
return new_holdings
beancount.ops.holdings.get_commodities_at_date(entries, options_map, date=None)
Return a list of commodities present at a particular date.
This routine fetches the holdings present at a particular date and returns a list of the commodities held in those holdings. This should define the list of price date points required to assess the market value of this portfolio.
Notes:
-
The ticker symbol will be fetched from the corresponding Commodity directive. If there is no ticker symbol defined for a directive or no corresponding Commodity directive, the currency is still included, but 'None' is specified for the symbol. The code that uses this routine should be free to use the currency name to make an attempt to fetch the currency using its name, or to ignore it.
-
The 'cost-currency' is that which is found on the holdings instance and can be ignored. The 'quote-currency' is that which is declared on the Commodity directive from its 'quote' metadata field.
This is used in a routine that fetches prices from a data source on the internet (either from LedgerHub, but you can reuse this in your own script if you build one).
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def get_commodities_at_date(entries, options_map, date=None):
"""Return a list of commodities present at a particular date.
This routine fetches the holdings present at a particular date and returns a
list of the commodities held in those holdings. This should define the list
of price date points required to assess the market value of this portfolio.
Notes:
* The ticker symbol will be fetched from the corresponding Commodity
directive. If there is no ticker symbol defined for a directive or no
corresponding Commodity directive, the currency is still included, but
'None' is specified for the symbol. The code that uses this routine should
be free to use the currency name to make an attempt to fetch the currency
using its name, or to ignore it.
* The 'cost-currency' is that which is found on the holdings instance and
can be ignored. The 'quote-currency' is that which is declared on the
Commodity directive from its 'quote' metadata field.
This is used in a routine that fetches prices from a data source on the
internet (either from LedgerHub, but you can reuse this in your own script
if you build one).
Args:
entries: A list of directives.
date: A datetime.date instance, the date at which to get the list of
relevant holdings.
Returns:
A list of (currency, cost-currency, quote-currency, ticker) tuples, where
currency: The Beancount base currency to fetch a price for.
cost-currency: The cost-currency of the holdings found at the given date.
quote-currency: The currency formally declared as quote currency in the
metadata of Commodity directives.
ticker: The ticker symbol to use for fetching the price (extracted from
the metadata of Commodity directives).
"""
# Remove all the entries after the given date, if requested.
if date is not None:
entries = summarize.truncate(entries, date)
# Get the list of holdings at the particular date.
holdings_list = get_final_holdings(entries)
# Obtain the unique list of currencies we need to fetch.
commodities_list = {(holding.currency, holding.cost_currency)
for holding in holdings_list}
# Add in the associated ticker symbols.
commodities_map = getters.get_commodity_map(entries)
commodities_symbols_list = []
for currency, cost_currency in sorted(commodities_list):
try:
commodity_entry = commodities_map[currency]
ticker = commodity_entry.meta.get('ticker', None)
quote_currency = commodity_entry.meta.get('quote', None)
except KeyError:
ticker = None
quote_currency = None
commodities_symbols_list.append(
(currency, cost_currency, quote_currency, ticker))
return commodities_symbols_list
beancount.ops.holdings.get_final_holdings(entries, included_account_types=None, price_map=None, date=None)
Get a dictionary of the latest holdings by account.
This basically just flattens the balance sheet's final positions, including that of equity accounts. If a 'price_map' is provided, insert price information in the flattened holdings at the latest date, or at the given date, if one is provided.
Only the accounts in 'included_account_types' will be included, and this is always called for Assets and Liabilities only. If left unspecified, holdings from all account types will be included, including Equity, Income and Expenses.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def get_final_holdings(entries, included_account_types=None, price_map=None, date=None):
"""Get a dictionary of the latest holdings by account.
This basically just flattens the balance sheet's final positions, including
that of equity accounts. If a 'price_map' is provided, insert price
information in the flattened holdings at the latest date, or at the given
date, if one is provided.
Only the accounts in 'included_account_types' will be included, and this is
always called for Assets and Liabilities only. If left unspecified, holdings
from all account types will be included, including Equity, Income and
Expenses.
Args:
entries: A list of directives.
included_account_types: A sequence of strings, the account types to
include in the output. A reasonable example would be
('Assets', 'Liabilities'). If not specified, include all account types.
price_map: A dict of prices, as built by prices.build_price_map().
date: A datetime.date instance, the date at which to price the
holdings. If left unspecified, we use the latest price information.
Returns:
A list of dicts, with the following fields:
"""
# Remove the entries inserted by unrealized gains/losses. Those entries do
# affect asset accounts, and we don't want them to appear in holdings.
#
# Note: Perhaps it would make sense to generalize this concept of "inserted
# unrealized gains."
simple_entries = [entry
for entry in entries
if (not isinstance(entry, data.Transaction) or
entry.flag != flags.FLAG_UNREALIZED)]
# Realize the accounts into a tree (because we want the positions by-account).
root_account = realization.realize(simple_entries)
# For each account, look at the list of positions and build a list.
holdings = []
for real_account in sorted(list(realization.iter_children(root_account)),
key=lambda ra: ra.account):
if included_account_types:
# Skip accounts of invalid types, we only want to reflect the requested
# account types, typically assets and liabilities.
account_type = account_types.get_account_type(real_account.account)
if account_type not in included_account_types:
continue
for pos in real_account.balance.get_positions():
if pos.cost is not None:
# Get price information if we have a price_map.
market_value = None
if price_map is not None:
base_quote = (pos.units.currency, pos.cost.currency)
price_date, price_number = prices.get_price(price_map,
base_quote, date)
if price_number is not None:
market_value = pos.units.number * price_number
else:
price_date, price_number = None, None
holding = Holding(real_account.account,
pos.units.number,
pos.units.currency,
pos.cost.number,
pos.cost.currency,
pos.units.number * pos.cost.number,
market_value,
price_number,
price_date)
else:
holding = Holding(real_account.account,
pos.units.number,
pos.units.currency,
None,
pos.units.currency,
pos.units.number,
pos.units.number,
None,
None)
holdings.append(holding)
return holdings
beancount.ops.holdings.holding_to_position(holding)
Convert the holding to a position.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def holding_to_position(holding):
"""Convert the holding to a position.
Args:
holding: An instance of Holding.
Returns:
An instance of Position.
"""
return position.Position(
amount.Amount(holding.number, holding.currency),
(position.Cost(holding.cost_number, holding.cost_currency, None, None)
if holding.cost_number
else None))
beancount.ops.holdings.holding_to_posting(holding)
Convert the holding to an instance of Posting.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def holding_to_posting(holding):
"""Convert the holding to an instance of Posting.
Args:
holding: An instance of Holding.
Returns:
An instance of Position.
"""
position_ = holding_to_position(holding)
price = (amount.Amount(holding.price_number, holding.cost_currency)
if holding.price_number
else None)
return data.Posting(holding.account, position_.units, position_.cost, price, None, None)
beancount.ops.holdings.reduce_relative(holdings)
Convert the market and book values of the given list of holdings to relative data.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def reduce_relative(holdings):
"""Convert the market and book values of the given list of holdings to relative data.
Args:
holdings: A list of Holding instances.
Returns:
A list of holdings instances with the absolute value fields replaced by
fractions of total portfolio. The new list of holdings is sorted by
currency, and the relative fractions are also relative to that currency.
"""
# Group holdings by value currency.
by_currency = collections.defaultdict(list)
ordering = {}
for index, holding in enumerate(holdings):
ordering.setdefault(holding.cost_currency, index)
by_currency[holding.cost_currency].append(holding)
fractional_holdings = []
for currency in sorted(by_currency, key=ordering.get):
currency_holdings = by_currency[currency]
# Compute total market value for that currency.
total_book_value = ZERO
total_market_value = ZERO
for holding in currency_holdings:
if holding.book_value:
total_book_value += holding.book_value
if holding.market_value:
total_market_value += holding.market_value
# Sort the currency's holdings with decreasing values of market value.
currency_holdings.sort(
key=lambda holding: holding.market_value or ZERO,
reverse=True)
# Output new holdings with the relevant values replaced.
for holding in currency_holdings:
fractional_holdings.append(
holding._replace(book_value=(holding.book_value / total_book_value
if holding.book_value is not None
else None),
market_value=(holding.market_value / total_market_value
if holding.market_value is not None
else None)))
return fractional_holdings
beancount.ops.holdings.scale_holding(holding, scale_factor)
Scale the values of a holding.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/holdings.py
def scale_holding(holding, scale_factor):
"""Scale the values of a holding.
Args:
holding: An instance of Holding.
scale_factor: A float or Decimal number.
Returns:
A scaled copy of the holding.
"""
return Holding(
holding.account,
holding.number * scale_factor if holding.number else None,
holding.currency,
holding.cost_number,
holding.cost_currency,
holding.book_value * scale_factor if holding.book_value else None,
holding.market_value * scale_factor if holding.market_value else None,
holding.price_number,
holding.price_date)
beancount.ops.lifetimes
Given a Beancount ledger, compute time intervals where we hold each commodity.
This script computes, for each commodity, which time intervals it is required at. This can then be used to identify a list of dates at which we need to fetch prices in order to properly fill the price database.
beancount.ops.lifetimes.compress_intervals_days(intervals, num_days)
Compress a list of date pairs to ignore short stretches of unused days.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def compress_intervals_days(intervals, num_days):
"""Compress a list of date pairs to ignore short stretches of unused days.
Args:
intervals: A list of pairs of datetime.date instances.
num_days: An integer, the number of unused days to require for intervals
to be distinct, to allow a gap.
Returns:
A new dict of lifetimes map where some intervals may have been joined.
"""
ignore_interval = datetime.timedelta(days=num_days)
new_intervals = []
iter_intervals = iter(intervals)
last_begin, last_end = next(iter_intervals)
for date_begin, date_end in iter_intervals:
if date_begin - last_end < ignore_interval:
# Compress.
last_end = date_end
continue
new_intervals.append((last_begin, last_end))
last_begin, last_end = date_begin, date_end
new_intervals.append((last_begin, last_end))
return new_intervals
beancount.ops.lifetimes.compress_lifetimes_days(lifetimes_map, num_days)
Compress a lifetimes map to ignore short stretches of unused days.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def compress_lifetimes_days(lifetimes_map, num_days):
"""Compress a lifetimes map to ignore short stretches of unused days.
Args:
lifetimes_map: A dict of currency intervals as returned by get_commodity_lifetimes.
num_days: An integer, the number of unused days to ignore.
Returns:
A new dict of lifetimes map where some intervals may have been joined.
"""
return {currency_pair: compress_intervals_days(intervals, num_days)
for currency_pair, intervals in lifetimes_map.items()}
beancount.ops.lifetimes.get_commodity_lifetimes(entries)
Given a list of directives, figure out the life of each commodity.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def get_commodity_lifetimes(entries):
"""Given a list of directives, figure out the life of each commodity.
Args:
entries: A list of directives.
Returns:
A dict of (currency, cost-currency) commodity strings to lists of (start,
end) datetime.date pairs. The dates are inclusive of the day the commodity
was seen; the end/last dates are one day _after_ the last date seen.
"""
lifetimes = collections.defaultdict(list)
# The current set of active commodities.
commodities = set()
# The current balances across all accounts.
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
# Process only transaction entries.
if not isinstance(entry, data.Transaction):
continue
# Update the balance of affected accounts and check locally whether that
# triggered a change in the set of commodities.
commodities_changed = False
for posting in entry.postings:
balance = balances[posting.account]
commodities_before = balance.currency_pairs()
balance.add_position(posting)
commodities_after = balance.currency_pairs()
if commodities_after != commodities_before:
commodities_changed = True
# If there was a change in one of the affected account's list of
# commodities, recompute the total set globally. This should not
# occur very frequently.
if commodities_changed:
new_commodities = set(
itertools.chain(*(inv.currency_pairs() for inv in balances.values())))
if new_commodities != commodities:
# The new global set of commodities has changed; update our
# the dictionary of intervals.
for currency in new_commodities - commodities:
lifetimes[currency].append((entry.date, None))
for currency in commodities - new_commodities:
lifetime = lifetimes[currency]
begin_date, end_date = lifetime.pop(-1)
assert end_date is None
lifetime.append((begin_date, entry.date + ONEDAY))
# Update our current set.
commodities = new_commodities
return lifetimes
beancount.ops.lifetimes.required_weekly_prices(lifetimes_map, date_last)
Enumerate all the commodities and Fridays where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the Fridays for each commodity where it is active. This can be used to connect to a historical price fetcher routine to fill in missing price entries from an existing ledger.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def required_weekly_prices(lifetimes_map, date_last):
"""Enumerate all the commodities and Fridays where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the Fridays
for each commodity where it is active. This can be used to connect to a
historical price fetcher routine to fill in missing price entries from an
existing ledger.
Args:
lifetimes_map: A dict of currency to active intervals as returned by
get_commodity_lifetimes().
date_last: A datetime.date instance, the last date which we're interested in.
Returns:
Tuples of (date, currency, cost-currency).
"""
results = []
for currency_pair, intervals in lifetimes_map.items():
if currency_pair[1] is None:
continue
for date_begin, date_end in intervals:
# Find first Friday before the minimum date.
diff_days = 4 - date_begin.weekday()
if diff_days > 1:
diff_days -= 7
date = date_begin + datetime.timedelta(days=diff_days)
# Iterate over all Fridays.
if date_end is None:
date_end = date_last
while date < date_end:
results.append((date, currency_pair[0], currency_pair[1]))
date += ONE_WEEK
return sorted(results)
beancount.ops.pad
Automatic padding of gaps between entries.
beancount.ops.pad.PadError (tuple)
PadError(source, message, entry)
beancount.ops.pad.PadError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/pad.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.pad.PadError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of PadError(source, message, entry)
beancount.ops.pad.PadError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/pad.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.pad.pad(entries, options_map)
Insert transaction entries for to fulfill a subsequent balance check.
Synthesize and insert Transaction entries right after Pad entries in order to fulfill checks in the padded accounts. Returns a new list of entries. Note that this doesn't pad across parent-child relationships, it is a very simple kind of pad. (I have found this to be sufficient in practice, and simpler to implement and understand.)
Furthermore, this pads for a single currency only, that is, balance checks are specified only for one currency at a time, and pads will only be inserted for those currencies.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/pad.py
def pad(entries, options_map):
"""Insert transaction entries for to fulfill a subsequent balance check.
Synthesize and insert Transaction entries right after Pad entries in order
to fulfill checks in the padded accounts. Returns a new list of entries.
Note that this doesn't pad across parent-child relationships, it is a very
simple kind of pad. (I have found this to be sufficient in practice, and
simpler to implement and understand.)
Furthermore, this pads for a single currency only, that is, balance checks
are specified only for one currency at a time, and pads will only be
inserted for those currencies.
Args:
entries: A list of directives.
options_map: A parser options dict.
Returns:
A new list of directives, with Pad entries inserted, and a list of new
errors produced.
"""
pad_errors = []
# Find all the pad entries and group them by account.
pads = list(misc_utils.filter_type(entries, data.Pad))
pad_dict = misc_utils.groupby(lambda x: x.account, pads)
# Partially realize the postings, so we can iterate them by account.
by_account = realization.postings_by_account(entries)
# A dict of pad -> list of entries to be inserted.
new_entries = {id(pad): [] for pad in pads}
# Process each account that has a padding group.
for account_, pad_list in sorted(pad_dict.items()):
# Last encountered / currency active pad entry.
active_pad = None
# Gather all the postings for the account and its children.
postings = []
is_child = account.parent_matcher(account_)
for item_account, item_postings in by_account.items():
if is_child(item_account):
postings.extend(item_postings)
postings.sort(key=data.posting_sortkey)
# A set of currencies already padded so far in this account.
padded_lots = set()
pad_balance = inventory.Inventory()
for entry in postings:
assert not isinstance(entry, data.Posting)
if isinstance(entry, data.TxnPosting):
# This is a transaction; update the running balance for this
# account.
pad_balance.add_position(entry.posting)
elif isinstance(entry, data.Pad):
if entry.account == account_:
# Mark this newly encountered pad as active and allow all lots
# to be padded heretofore.
active_pad = entry
padded_lots = set()
elif isinstance(entry, data.Balance):
check_amount = entry.amount
# Compare the current balance amount to the expected one from
# the check entry. IMPORTANT: You need to understand that this
# does not check a single position, but rather checks that the
# total amount for a particular currency (which itself is
# distinct from the cost).
balance_amount = pad_balance.get_currency_units(check_amount.currency)
diff_amount = amount.sub(balance_amount, check_amount)
# Use the specified tolerance or automatically infer it.
tolerance = balance.get_balance_tolerance(entry, options_map)
if abs(diff_amount.number) > tolerance:
# The check fails; we need to pad.
# Pad only if pad entry is active and we haven't already
# padded that lot since it was last encountered.
if active_pad and (check_amount.currency not in padded_lots):
# Note: we decide that it's an error to try to pad
# positions at cost; we check here that all the existing
# positions with that currency have no cost.
positions = [pos
for pos in pad_balance.get_positions()
if pos.units.currency == check_amount.currency]
for position_ in positions:
if position_.cost is not None:
pad_errors.append(
PadError(entry.meta,
("Attempt to pad an entry with cost for "
"balance: {}".format(pad_balance)),
active_pad))
# Thus our padding lot is without cost by default.
diff_position = position.Position.from_amounts(
amount.Amount(check_amount.number - balance_amount.number,
check_amount.currency))
# Synthesize a new transaction entry for the difference.
narration = ('(Padding inserted for Balance of {} for '
'difference {})').format(check_amount, diff_position)
new_entry = data.Transaction(
active_pad.meta.copy(), active_pad.date, flags.FLAG_PADDING,
None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
new_entry.postings.append(
data.Posting(active_pad.account,
diff_position.units, diff_position.cost,
None, None, None))
neg_diff_position = -diff_position
new_entry.postings.append(
data.Posting(active_pad.source_account,
neg_diff_position.units, neg_diff_position.cost,
None, None, None))
# Save it for later insertion after the active pad.
new_entries[id(active_pad)].append(new_entry)
# Fixup the running balance.
pos, _ = pad_balance.add_position(diff_position)
if pos is not None and pos.is_negative_at_cost():
raise ValueError(
"Position held at cost goes negative: {}".format(pos))
# Mark this lot as padded. Further checks should not pad this lot.
padded_lots.add(check_amount.currency)
# Insert the newly created entries right after the pad entries that created them.
padded_entries = []
for entry in entries:
padded_entries.append(entry)
if isinstance(entry, data.Pad):
entry_list = new_entries[id(entry)]
if entry_list:
padded_entries.extend(entry_list)
else:
# Generate errors on unused pad entries.
pad_errors.append(
PadError(entry.meta, "Unused Pad entry", entry))
return padded_entries, pad_errors
beancount.ops.summarize
Summarization of entries.
This code is used to summarize a sequence of entries (e.g. during a time period) into a few "opening balance" entries. This is when computing a balance sheet for a specific time period: we don't want to see the entries from before some period of time, so we fold them into a single transaction per account that has the sum total amount of that account.
beancount.ops.summarize.balance_by_account(entries, date=None)
Sum up the balance per account for all entries strictly before 'date'.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def balance_by_account(entries, date=None):
"""Sum up the balance per account for all entries strictly before 'date'.
Args:
entries: A list of directives.
date: An optional datetime.date instance. If provided, stop accumulating
on and after this date. This is useful for summarization before a
specific date.
Returns:
A pair of a dict of account string to instance Inventory (the balance of
this account before the given date), and the index in the list of entries
where the date was encountered. If all entries are located before the
cutoff date, an index one beyond the last entry is returned.
"""
balances = collections.defaultdict(inventory.Inventory)
for index, entry in enumerate(entries):
if date and entry.date >= date:
break
if isinstance(entry, Transaction):
for posting in entry.postings:
account_balance = balances[posting.account]
# Note: We must allow negative lots at cost, because this may be
# used to reduce a filtered list of entries which may not
# include the entries necessary to keep units at cost always
# above zero. The only summation that is guaranteed to be above
# zero is if all the entries are being summed together, no
# entries are filtered, at least for a particular account's
# postings.
account_balance.add_position(posting)
else:
index = len(entries)
return balances, index
beancount.ops.summarize.cap(entries, account_types, conversion_currency, account_earnings, account_conversions)
Transfer net income to equity and insert a final conversion entry.
This is used to move and nullify balances from the income and expense accounts to an equity account in order to draw up a balance sheet with a balance of precisely zero.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def cap(entries,
account_types,
conversion_currency,
account_earnings,
account_conversions):
"""Transfer net income to equity and insert a final conversion entry.
This is used to move and nullify balances from the income and expense
accounts to an equity account in order to draw up a balance sheet with a
balance of precisely zero.
Args:
entries: A list of directives.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the equity account to transfer
final balances of the income and expense accounts to.
account_conversions: A string, the name of the equity account to use as
the source for currency conversions.
Returns:
A modified list of entries, with the income and expense accounts
transferred.
"""
# Transfer the balances of income and expense accounts as earnings / net
# income.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
entries = transfer_balances(entries, None,
income_statement_account_pred,
account_earnings)
# Insert final conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, None)
return entries
beancount.ops.summarize.cap_opt(entries, options_map)
Close by getting all the parameters from an options map.
See cap() for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def cap_opt(entries, options_map):
"""Close by getting all the parameters from an options map.
See cap() for details.
Args:
entries: See cap().
options_map: A parser's option_map.
Returns:
Same as close().
"""
account_types = options.get_account_types(options_map)
current_accounts = options.get_current_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return cap(entries,
account_types,
conversion_currency,
*current_accounts)
beancount.ops.summarize.clamp(entries, begin_date, end_date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)
Filter entries to include only those during a specified time period.
Firstly, this method will transfer all balances for the income and expense accounts occurring before the given period begin date to the 'account_earnings' account (earnings before the period, or "retained earnings") and summarize all of the transactions before that date against the 'account_opening' account (usually "opening balances"). The resulting income and expense accounts should have no transactions (since their balances have been transferred out and summarization of zero balances should not add any transactions).
Secondly, all the entries after the period end date will be truncated and a conversion entry will be added for the resulting transactions that reflect changes occurring between the beginning and end of the exercise period. The resulting balance of all account should be empty.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clamp(entries,
begin_date, end_date,
account_types,
conversion_currency,
account_earnings,
account_opening,
account_conversions):
"""Filter entries to include only those during a specified time period.
Firstly, this method will transfer all balances for the income and expense
accounts occurring before the given period begin date to the
'account_earnings' account (earnings before the period, or "retained
earnings") and summarize all of the transactions before that date against
the 'account_opening' account (usually "opening balances"). The resulting
income and expense accounts should have no transactions (since their
balances have been transferred out and summarization of zero balances should
not add any transactions).
Secondly, all the entries after the period end date will be truncated and a
conversion entry will be added for the resulting transactions that reflect
changes occurring between the beginning and end of the exercise period. The
resulting balance of all account should be empty.
Args:
entries: A list of directive tuples.
begin_date: A datetime.date instance, the beginning of the period.
end_date: A datetime.date instance, one day beyond the end of the period.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
account_opening: A string, the name of the account in equity
to transfer previous balances from, in order to initialize account
balances at the beginning of the period. This is typically called an
opening balances account.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to the first
original transaction after the beginning date of the period. This index
can be used to generate the opening balances report, which is a balance
sheet fed with only the summarized entries.
"""
# Transfer income and expenses before the period to equity.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
entries = transfer_balances(entries, begin_date,
income_statement_account_pred, account_earnings)
# Summarize all the previous balances, after transferring the income and
# expense balances, so all entries for those accounts before the begin date
# should now disappear.
entries, index = summarize(entries, begin_date, account_opening)
# Truncate the entries after this.
entries = truncate(entries, end_date)
# Insert conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, end_date)
return entries, index
beancount.ops.summarize.clamp_opt(entries, begin_date, end_date, options_map)
Clamp by getting all the parameters from an options map.
See clamp() for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clamp_opt(entries, begin_date, end_date, options_map):
"""Clamp by getting all the parameters from an options map.
See clamp() for details.
Args:
entries: See clamp().
begin_date: See clamp().
end_date: See clamp().
options_map: A parser's option_map.
Returns:
Same as clamp().
"""
account_types = options.get_account_types(options_map)
previous_accounts = options.get_previous_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return clamp(entries, begin_date, end_date,
account_types,
conversion_currency,
*previous_accounts)
beancount.ops.summarize.clear(entries, date, account_types, account_earnings)
Transfer income and expenses balances at the given date to the equity accounts.
This method insert entries to zero out balances on income and expenses accounts by transferring them to an equity account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clear(entries,
date,
account_types,
account_earnings):
"""Transfer income and expenses balances at the given date to the equity accounts.
This method insert entries to zero out balances on income and expenses
accounts by transferring them to an equity account.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, one day beyond the end of the period. This
date can be optionally left to None in order to close at the end of the
list of entries.
account_types: An instance of AccountTypes.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
Returns:
A new list of entries is returned, and the index that points to one before
the last original transaction before the transfers.
"""
index = len(entries)
# Transfer income and expenses before the period to equity.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
new_entries = transfer_balances(entries, date,
income_statement_account_pred, account_earnings)
return new_entries, index
beancount.ops.summarize.clear_opt(entries, date, options_map)
Convenience function to clear() using an options map.
Source code in beancount/ops/summarize.py
def clear_opt(entries, date, options_map):
"""Convenience function to clear() using an options map.
"""
account_types = options.get_account_types(options_map)
current_accounts = options.get_current_accounts(options_map)
return clear(entries, date, account_types, current_accounts[0])
beancount.ops.summarize.close(entries, date, conversion_currency, account_conversions)
Truncate entries that occur after a particular date and ensure balance.
This method essentially removes entries after a date. It truncates the future. To do so, it will
-
Remove all entries which occur after 'date', if given.
-
Insert conversion transactions at the end of the list of entries to ensure that the total balance of all postings sums up to empty.
The result is a list of entries with a total balance of zero, with possibly non-zero balances for the income/expense accounts. To produce a final balance sheet, use transfer() to move the net income to the equity accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def close(entries,
date,
conversion_currency,
account_conversions):
"""Truncate entries that occur after a particular date and ensure balance.
This method essentially removes entries after a date. It truncates the
future. To do so, it will
1. Remove all entries which occur after 'date', if given.
2. Insert conversion transactions at the end of the list of entries to
ensure that the total balance of all postings sums up to empty.
The result is a list of entries with a total balance of zero, with possibly
non-zero balances for the income/expense accounts. To produce a final
balance sheet, use transfer() to move the net income to the equity accounts.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, one day beyond the end of the period. This
date can be optionally left to None in order to close at the end of the
list of entries.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to one beyond
the last original transaction that was provided. Further entries may have
been inserted to normalize conversions and ensure the total balance sums
to zero.
"""
# Truncate the entries after the date, if a date has been provided.
if date is not None:
entries = truncate(entries, date)
# Keep an index to the truncated list of entries (before conversions).
index = len(entries)
# Insert a conversions entry to ensure the total balance of all accounts is
# flush zero.
entries = conversions(entries, account_conversions, conversion_currency, date)
return entries, index
beancount.ops.summarize.close_opt(entries, date, options_map)
Convenience function to close() using an options map.
Source code in beancount/ops/summarize.py
def close_opt(entries, date, options_map):
"""Convenience function to close() using an options map.
"""
conversion_currency = options_map['conversion_currency']
current_accounts = options.get_current_accounts(options_map)
return close(entries, date, conversion_currency, current_accounts[1])
beancount.ops.summarize.conversions(entries, conversion_account, conversion_currency, date=None)
Insert a conversion entry at date 'date' at the given account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def conversions(entries, conversion_account, conversion_currency, date=None):
"""Insert a conversion entry at date 'date' at the given account.
Args:
entries: A list of entries.
conversion_account: A string, the account to book against.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
date: The date before which to insert the conversion entry. The new
entry will be inserted as the last entry of the date just previous
to this date.
Returns:
A modified list of entries.
"""
# Compute the balance at the given date.
conversion_balance = interpolate.compute_entries_balance(entries, date=date)
# Early exit if there is nothing to do.
conversion_cost_balance = conversion_balance.reduce(convert.get_cost)
if conversion_cost_balance.is_empty():
return entries
# Calculate the index and the date for the new entry. We want to store it as
# the last transaction of the day before.
if date is not None:
index = bisect_key.bisect_left_with_key(entries, date, key=lambda entry: entry.date)
last_date = date - datetime.timedelta(days=1)
else:
index = len(entries)
last_date = entries[-1].date
meta = data.new_metadata('<conversions>', -1)
narration = 'Conversion for {}'.format(conversion_balance)
conversion_entry = Transaction(meta, last_date, flags.FLAG_CONVERSIONS,
None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
for position in conversion_cost_balance.get_positions():
# Important note: Set the cost to zero here to maintain the balance
# invariant. (This is the only single place we cheat on the balance rule
# in the entire system and this is necessary; see documentation on
# Conversions.)
price = amount.Amount(ZERO, conversion_currency)
neg_pos = -position
conversion_entry.postings.append(
data.Posting(conversion_account, neg_pos.units, neg_pos.cost,
price, None, None))
# Make a copy of the list of entries and insert the new transaction into it.
new_entries = list(entries)
new_entries.insert(index, conversion_entry)
return new_entries
beancount.ops.summarize.create_entries_from_balances(balances, date, source_account, direction, meta, flag, narration_template)
"Create a list of entries from a dict of balances.
This method creates a list of new entries to transfer the amounts in the 'balances' dict to/from another account specified in 'source_account'.
The balancing posting is created with the equivalent at cost. In other words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a posting with this position on one leg, and with 5000 USD on the 'source_account' leg.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def create_entries_from_balances(balances, date, source_account, direction,
meta, flag, narration_template):
""""Create a list of entries from a dict of balances.
This method creates a list of new entries to transfer the amounts in the
'balances' dict to/from another account specified in 'source_account'.
The balancing posting is created with the equivalent at cost. In other
words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a
posting with this position on one leg, and with 5000 USD on the
'source_account' leg.
Args:
balances: A dict of account name strings to Inventory instances.
date: A datetime.date object, the date at which to create the transaction.
source_account: A string, the name of the account to pull the balances
from. This is the magician's hat to pull the rabbit from.
direction: If 'direction' is True, the new entries transfer TO the
balances account from the source account; otherwise the new entries
transfer FROM the balances into the source account.
meta: A dict to use as metadata for the transactions.
flag: A string, the flag to use for the transactions.
narration_template: A format string for creating the narration. It is
formatted with 'account' and 'date' replacement variables.
Returns:
A list of newly synthesizes Transaction entries.
"""
new_entries = []
for account, account_balance in sorted(balances.items()):
# Don't create new entries where there is no balance.
if account_balance.is_empty():
continue
narration = narration_template.format(account=account, date=date)
if not direction:
account_balance = -account_balance
postings = []
new_entry = Transaction(
meta, date, flag, None, narration, data.EMPTY_SET, data.EMPTY_SET, postings)
for position in account_balance.get_positions():
postings.append(data.Posting(account, position.units, position.cost,
None, None, None))
cost = -convert.get_cost(position)
postings.append(data.Posting(source_account, cost, None,
None, None, None))
new_entries.append(new_entry)
return new_entries
beancount.ops.summarize.get_open_entries(entries, date)
Gather the list of active Open entries at date.
This returns the list of Open entries that have not been closed at the given date, in the same order they were observed in the document.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def get_open_entries(entries, date):
"""Gather the list of active Open entries at date.
This returns the list of Open entries that have not been closed at the given
date, in the same order they were observed in the document.
Args:
entries: A list of directives.
date: The date at which to look for an open entry. If not specified, will
return the entries still open at the latest date.
Returns:
A list of Open directives.
"""
open_entries = {}
for index, entry in enumerate(entries):
if date is not None and entry.date >= date:
break
if isinstance(entry, Open):
try:
ex_index, ex_entry = open_entries[entry.account]
if entry.date < ex_entry.date:
open_entries[entry.account] = (index, entry)
except KeyError:
open_entries[entry.account] = (index, entry)
elif isinstance(entry, Close):
# If there is no corresponding open, don't raise an error.
open_entries.pop(entry.account, None)
return [entry for (index, entry) in sorted(open_entries.values())]
beancount.ops.summarize.open(entries, date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)
Summarize entries before a date and transfer income/expenses to equity.
This method essentially prepares a list of directives to contain only transactions that occur after a particular date. It truncates the past. To do so, it will
-
Insert conversion transactions at the given open date, then
-
Insert transactions at that date to move accumulated balances from before that date from the income and expenses accounts to an equity account, and finally
-
It removes all the transactions previous to the date and replaces them by opening balances entries to bring the balances to the same amount.
The result is a list of entries for which the income and expense accounts are beginning with a balance of zero, and all other accounts begin with a transaction that brings their balance to the expected amount. All the past has been summarized at that point.
An index is returned to the first transaction past the balance opening transactions, so you can keep just those in order to render a balance sheet for only the opening balances.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def open(entries,
date,
account_types,
conversion_currency,
account_earnings,
account_opening,
account_conversions):
"""Summarize entries before a date and transfer income/expenses to equity.
This method essentially prepares a list of directives to contain only
transactions that occur after a particular date. It truncates the past. To
do so, it will
1. Insert conversion transactions at the given open date, then
2. Insert transactions at that date to move accumulated balances from before
that date from the income and expenses accounts to an equity account, and
finally
3. It removes all the transactions previous to the date and replaces them by
opening balances entries to bring the balances to the same amount.
The result is a list of entries for which the income and expense accounts
are beginning with a balance of zero, and all other accounts begin with a
transaction that brings their balance to the expected amount. All the past
has been summarized at that point.
An index is returned to the first transaction past the balance opening
transactions, so you can keep just those in order to render a balance sheet
for only the opening balances.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, the date at which to do this.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
account_opening: A string, the name of the account in equity
to transfer previous balances from, in order to initialize account
balances at the beginning of the period. This is typically called an
opening balances account.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to the first
original transaction after the beginning date of the period. This index
can be used to generate the opening balances report, which is a balance
sheet fed with only the summarized entries.
"""
# Insert conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, date)
# Transfer income and expenses before the period to equity.
entries, _ = clear(entries, date, account_types, account_earnings)
# Summarize all the previous balances, after transferring the income and
# expense balances, so all entries for those accounts before the begin date
# should now disappear.
entries, index = summarize(entries, date, account_opening)
return entries, index
beancount.ops.summarize.open_opt(entries, date, options_map)
Convenience function to open() using an options map.
Source code in beancount/ops/summarize.py
def open_opt(entries, date, options_map):
"""Convenience function to open() using an options map.
"""
account_types = options.get_account_types(options_map)
previous_accounts = options.get_previous_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return open(entries, date, account_types, conversion_currency, *previous_accounts)
beancount.ops.summarize.summarize(entries, date, account_opening)
Summarize all entries before a date by replacing then with summarization entries.
This function replaces the transactions up to (and not including) the given date with a opening balance transactions, one for each account. It returns new entries, all of the transactions before the given date having been replaced by a few summarization entries, one for each account.
Notes: - Open entries are preserved for active accounts. - The last relevant price entry for each (base, quote) pair is preserved. - All other entries before the cutoff date are culled.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def summarize(entries, date, account_opening):
"""Summarize all entries before a date by replacing then with summarization entries.
This function replaces the transactions up to (and not including) the given
date with a opening balance transactions, one for each account. It returns
new entries, all of the transactions before the given date having been
replaced by a few summarization entries, one for each account.
Notes:
- Open entries are preserved for active accounts.
- The last relevant price entry for each (base, quote) pair is preserved.
- All other entries before the cutoff date are culled.
Args:
entries: A list of directives.
date: A datetime.date instance, the cutoff date before which to summarize.
account_opening: A string, the name of the source account to book summarization
entries against.
Returns:
The function returns a list of new entries and the integer index at which
the entries on or after the cutoff date begin.
"""
# Compute balances at date.
balances, index = balance_by_account(entries, date)
# We need to insert the entries with a date previous to subsequent checks,
# to maintain ensure the open directives show up before any transaction.
summarize_date = date - datetime.timedelta(days=1)
# Create summarization / opening balance entries.
summarizing_entries = create_entries_from_balances(
balances, summarize_date, account_opening, True,
data.new_metadata('<summarize>', 0), flags.FLAG_SUMMARIZE,
"Opening balance for '{account}' (Summarization)")
# Insert the last price entry for each commodity from before the date.
price_entries = prices.get_last_price_entries(entries, date)
# Gather the list of active open entries at date.
open_entries = get_open_entries(entries, date)
# Compute entries before the date and preserve the entries after the date.
before_entries = sorted(open_entries + price_entries + summarizing_entries,
key=data.entry_sortkey)
after_entries = entries[index:]
# Return a new list of entries and the index that points after the entries
# were inserted.
return (before_entries + after_entries), len(before_entries)
beancount.ops.summarize.transfer_balances(entries, date, account_pred, transfer_account)
Synthesize transactions to transfer balances from some accounts at a given date.
For all accounts that match the 'account_pred' predicate, create new entries to transfer the balance at the given date from the account to the transfer account. This is used to transfer balances from income and expenses from a previous period to a "retained earnings" account. This is accomplished by creating new entries.
Note that inserting transfers breaks any following balance checks that are in the transferred accounts. For this reason, all balance assertion entries following the cutoff date for those accounts are removed from the list in output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def transfer_balances(entries, date, account_pred, transfer_account):
"""Synthesize transactions to transfer balances from some accounts at a given date.
For all accounts that match the 'account_pred' predicate, create new entries
to transfer the balance at the given date from the account to the transfer
account. This is used to transfer balances from income and expenses from a
previous period to a "retained earnings" account. This is accomplished by
creating new entries.
Note that inserting transfers breaks any following balance checks that are
in the transferred accounts. For this reason, all balance assertion entries
following the cutoff date for those accounts are removed from the list in
output.
Args:
entries: A list of directives.
date: A datetime.date instance, the date at which to make the transfer.
account_pred: A predicate function that, given an account string, returns
true if the account is meant to be transferred.
transfer_account: A string, the name of the source account to be used on
the transfer entries to receive balances at the given date.
Returns:
A new list of entries, with the new transfer entries added in.
"""
# Don't bother doing anything if there are no entries.
if not entries:
return entries
# Compute balances at date.
balances, index = balance_by_account(entries, date)
# Filter out to keep only the accounts we want.
transfer_balances = {account: balance
for account, balance in balances.items()
if account_pred(account)}
# We need to insert the entries at the end of the previous day.
if date:
transfer_date = date - datetime.timedelta(days=1)
else:
transfer_date = entries[-1].date
# Create transfer entries.
transfer_entries = create_entries_from_balances(
transfer_balances, transfer_date, transfer_account, False,
data.new_metadata('<transfer_balances>', 0), flags.FLAG_TRANSFER,
"Transfer balance for '{account}' (Transfer balance)")
# Remove balance assertions that occur after a transfer on an account that
# has been transferred away; they would break.
after_entries = [entry
for entry in entries[index:]
if not (isinstance(entry, balance.Balance) and
entry.account in transfer_balances)]
# Split the new entries in a new list.
return (entries[:index] + transfer_entries + after_entries)
beancount.ops.summarize.truncate(entries, date)
Filter out all the entries at and after date. Returns a new list of entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def truncate(entries, date):
"""Filter out all the entries at and after date. Returns a new list of entries.
Args:
entries: A sorted list of directives.
date: A datetime.date instance.
Returns:
A truncated list of directives.
"""
index = bisect_key.bisect_left_with_key(entries, date,
key=lambda entry: entry.date)
return entries[:index]
beancount.ops.validation
Validation checks.
These checks are intended to be run after all the plugins have transformed the list of entries, just before serving them or generating reports from them. The idea is to ensure a reasonable set of invariants and generate errors if those invariants are violated. They are not sanity checks--user data is subject to constraints which are hopefully detected here and which will result in errors trickled up to the user.
beancount.ops.validation.ValidationError (tuple)
ValidationError(source, message, entry)
beancount.ops.validation.ValidationError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/validation.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.validation.ValidationError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ValidationError(source, message, entry)
beancount.ops.validation.ValidationError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/validation.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.validation.validate(entries, options_map, log_timings=None, extra_validations=None)
Perform all the standard checks on parsed contents.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate(entries, options_map, log_timings=None, extra_validations=None):
"""Perform all the standard checks on parsed contents.
Args:
entries: A list of directives.
unused_options_map: An options map.
log_timings: An optional function to use for logging the time of individual
operations.
extra_validations: A list of extra validation functions to run after loading
this list of entries.
Returns:
A list of new errors, if any were found.
"""
validation_tests = VALIDATIONS
if extra_validations:
validation_tests += extra_validations
# Run various validation routines define above.
errors = []
for validation_function in validation_tests:
with misc_utils.log_time('function: {}'.format(validation_function.__name__),
log_timings, indent=2):
new_errors = validation_function(entries, options_map)
errors.extend(new_errors)
return errors
beancount.ops.validation.validate_active_accounts(entries, unused_options_map)
Check that all references to accounts occurs on active accounts.
We basically check that references to accounts from all directives other than Open and Close occur at dates the open-close interval of that account. This should be good for all of the directive types where we can extract an account name.
Note that this is more strict a check than comparing the dates: we actually check that no references to account are made on the same day before the open directive appears for that account. This is a nice property to have, and is supported by our custom sorting routine that will sort open entries before transaction entries, given the same date.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_active_accounts(entries, unused_options_map):
"""Check that all references to accounts occurs on active accounts.
We basically check that references to accounts from all directives other
than Open and Close occur at dates the open-close interval of that account.
This should be good for all of the directive types where we can extract an
account name.
Note that this is more strict a check than comparing the dates: we actually
check that no references to account are made on the same day before the open
directive appears for that account. This is a nice property to have, and is
supported by our custom sorting routine that will sort open entries before
transaction entries, given the same date.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
error_pairs = []
active_set = set()
opened_accounts = set()
for entry in entries:
if isinstance(entry, data.Open):
active_set.add(entry.account)
opened_accounts.add(entry.account)
elif isinstance(entry, data.Close):
active_set.discard(entry.account)
else:
for account in getters.get_entry_accounts(entry):
if account not in active_set:
# Allow document and note directives that occur after an
# account is closed.
if (isinstance(entry, ALLOW_AFTER_CLOSE) and
account in opened_accounts):
continue
# Register an error to be logged later, with an appropriate
# message.
error_pairs.append((account, entry))
# Refine the error message to disambiguate between the case of an account
# that has never been seen and one that was simply not active at the time.
errors = []
for account, entry in error_pairs:
if account in opened_accounts:
message = "Invalid reference to inactive account '{}'".format(account)
else:
message = "Invalid reference to unknown account '{}'".format(account)
errors.append(ValidationError(entry.meta, message, entry))
return errors
beancount.ops.validation.validate_check_transaction_balances(entries, options_map)
Check again that all transaction postings balance, as users may have transformed transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_check_transaction_balances(entries, options_map):
"""Check again that all transaction postings balance, as users may have
transformed transactions.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Note: this is a bit slow; we could limit our checks to the original
# transactions by using the hash function in the loader.
errors = []
for entry in entries:
if isinstance(entry, Transaction):
# IMPORTANT: This validation is _crucial_ and cannot be skipped.
# This is where we actually detect and warn on unbalancing
# transactions. This _must_ come after the user routines, because
# unbalancing input is legal, as those types of transactions may be
# "fixed up" by a user-plugin. In other words, we want to allow
# users to input unbalancing transactions as long as the final
# transactions objects that appear on the stream (after processing
# the plugins) are balanced. See {9e6c14b51a59}.
#
# Detect complete sets of postings that have residual balance;
residual = interpolate.compute_residual(entry.postings)
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
if not residual.is_small(tolerances):
errors.append(
ValidationError(entry.meta,
"Transaction does not balance: {}".format(residual),
entry))
return errors
beancount.ops.validation.validate_currency_constraints(entries, options_map)
Check the currency constraints from account open declarations.
Open directives admit an optional list of currencies that specify the only types of commodities that the running inventory for this account may contain. This function checks that all postings are only made in those commodities.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_currency_constraints(entries, options_map):
"""Check the currency constraints from account open declarations.
Open directives admit an optional list of currencies that specify the only
types of commodities that the running inventory for this account may
contain. This function checks that all postings are only made in those
commodities.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Get all the open entries with currency constraints.
open_map = {entry.account: entry
for entry in entries
if isinstance(entry, Open) and entry.currencies}
errors = []
for entry in entries:
if not isinstance(entry, Transaction):
continue
for posting in entry.postings:
# Look up the corresponding account's valid currencies; skip the
# check if there are none specified.
try:
open_entry = open_map[posting.account]
valid_currencies = open_entry.currencies
if not valid_currencies:
continue
except KeyError:
continue
# Perform the check.
if posting.units.currency not in valid_currencies:
errors.append(
ValidationError(
entry.meta,
"Invalid currency {} for account '{}'".format(
posting.units.currency, posting.account),
entry))
return errors
beancount.ops.validation.validate_data_types(entries, options_map)
Check that all the data types of the attributes of entries are as expected.
Users are provided with a means to filter the list of entries. They're able to write code that manipulates those tuple objects without any type constraints. With discipline, this mostly works, but I know better: check, just to make sure. This routine checks all the data types and assumptions on entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_data_types(entries, options_map):
"""Check that all the data types of the attributes of entries are as expected.
Users are provided with a means to filter the list of entries. They're able to
write code that manipulates those tuple objects without any type constraints.
With discipline, this mostly works, but I know better: check, just to make sure.
This routine checks all the data types and assumptions on entries.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
for entry in entries:
try:
data.sanity_check_types(
entry, options_map["allow_deprecated_none_for_tags_and_links"])
except AssertionError as exc:
errors.append(
ValidationError(entry.meta,
"Invalid data types: {}".format(exc),
entry))
return errors
beancount.ops.validation.validate_documents_paths(entries, options_map)
Check that all filenames in resolved Document entries are absolute filenames.
The processing of document entries is assumed to result in absolute paths. Relative paths are resolved at the parsing stage and at point we want to make sure we don't have to do any further processing on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_documents_paths(entries, options_map):
"""Check that all filenames in resolved Document entries are absolute filenames.
The processing of document entries is assumed to result in absolute paths.
Relative paths are resolved at the parsing stage and at point we want to
make sure we don't have to do any further processing on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
return [ValidationError(entry.meta, "Invalid relative path for entry", entry)
for entry in entries
if (isinstance(entry, Document) and
not path.isabs(entry.filename))]
beancount.ops.validation.validate_duplicate_balances(entries, unused_options_map)
Check that balance entries occur only once per day.
Because we do not support time, and the declaration order of entries is meant to be kept irrelevant, two balance entries with different amounts should not occur in the file. We do allow two identical balance assertions, however, because this may occur during import.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_duplicate_balances(entries, unused_options_map):
"""Check that balance entries occur only once per day.
Because we do not support time, and the declaration order of entries is
meant to be kept irrelevant, two balance entries with different amounts
should not occur in the file. We do allow two identical balance assertions,
however, because this may occur during import.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
# Mapping of (account, currency, date) to Balance entry.
balance_entries = {}
for entry in entries:
if not isinstance(entry, data.Balance):
continue
key = (entry.account, entry.amount.currency, entry.date)
try:
previous_entry = balance_entries[key]
if entry.amount != previous_entry.amount:
errors.append(
ValidationError(
entry.meta,
"Duplicate balance assertion with different amounts",
entry))
except KeyError:
balance_entries[key] = entry
return errors
beancount.ops.validation.validate_duplicate_commodities(entries, unused_options_map)
Check that commodity entries are unique for each commodity.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_duplicate_commodities(entries, unused_options_map):
"""Check that commodity entries are unique for each commodity.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
# Mapping of (account, currency, date) to Balance entry.
commodity_entries = {}
for entry in entries:
if not isinstance(entry, data.Commodity):
continue
key = entry.currency
try:
previous_entry = commodity_entries[key]
if previous_entry:
errors.append(
ValidationError(
entry.meta,
"Duplicate commodity directives for '{}'".format(key),
entry))
except KeyError:
commodity_entries[key] = entry
return errors
beancount.ops.validation.validate_open_close(entries, unused_options_map)
Check constraints on open and close directives themselves.
This method checks two kinds of constraints:
-
An open or a close directive may only show up once for each account. If a duplicate is detected, an error is generated.
-
Close directives may only appears if an open directive has been seen previous (chronologically).
-
The date of close directives must be strictly greater than their corresponding open directive.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_open_close(entries, unused_options_map):
"""Check constraints on open and close directives themselves.
This method checks two kinds of constraints:
1. An open or a close directive may only show up once for each account. If a
duplicate is detected, an error is generated.
2. Close directives may only appears if an open directive has been seen
previous (chronologically).
3. The date of close directives must be strictly greater than their
corresponding open directive.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
open_map = {}
close_map = {}
for entry in entries:
if isinstance(entry, Open):
if entry.account in open_map:
errors.append(
ValidationError(
entry.meta,
"Duplicate open directive for {}".format(entry.account),
entry))
else:
open_map[entry.account] = entry
elif isinstance(entry, Close):
if entry.account in close_map:
errors.append(
ValidationError(
entry.meta,
"Duplicate close directive for {}".format(entry.account),
entry))
else:
try:
open_entry = open_map[entry.account]
if entry.date <= open_entry.date:
errors.append(
ValidationError(
entry.meta,
"Internal error: closing date for {} "
"appears before opening date".format(entry.account),
entry))
except KeyError:
errors.append(
ValidationError(
entry.meta,
"Unopened account {} is being closed".format(entry.account),
entry))
close_map[entry.account] = entry
return errors