beancount.ops
Operations on the entries defined in the core modules.
This package contains various functions which operate on lists of entries.
beancount.ops.balance
Automatic padding of gaps between entries.
beancount.ops.balance.BalanceError (tuple)
BalanceError(source, message, entry)
beancount.ops.balance.BalanceError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/balance.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.balance.BalanceError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of BalanceError(source, message, entry)
beancount.ops.balance.BalanceError.__replace__(/, self, **kwds)
special
Return a new BalanceError object replacing specified fields with new values
Source code in beancount/ops/balance.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.ops.balance.BalanceError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/balance.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.balance.check(entries, options_map)
Process the balance assertion directives.
For each Balance directive, check that their expected balance corresponds to the actual balance computed at that time and replace failing ones by new ones with a flag that indicates failure.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/balance.py
def check(entries, options_map):
"""Process the balance assertion directives.
For each Balance directive, check that their expected balance corresponds to
the actual balance computed at that time and replace failing ones by new
ones with a flag that indicates failure.
Args:
entries: A list of directives.
options_map: A dict of options, parsed from the input file.
Returns:
A pair of a list of directives and a list of balance check errors.
"""
new_entries = []
check_errors = []
# This is similar to realization, but performed in a different order, and
# where we only accumulate inventories for accounts that have balance
# assertions in them (this saves on time). Here we process the entries one
# by one along with the balance checks. We use a temporary realization in
# order to hold the incremental tree of balances, so that we can easily get
# the amounts of an account's subaccounts for making checks on parent
# accounts.
real_root = realization.RealAccount('')
# Figure out the set of accounts for which we need to compute a running
# inventory balance.
asserted_accounts = {entry.account
for entry in entries
if isinstance(entry, Balance)}
# Add all children accounts of an asserted account to be calculated as well,
# and pre-create these accounts, and only those (we're just being tight to
# make sure).
asserted_match_list = [account.parent_matcher(account_)
for account_ in asserted_accounts]
for account_ in getters.get_accounts(entries):
if (account_ in asserted_accounts or
any(match(account_) for match in asserted_match_list)):
realization.get_or_create(real_root, account_)
# Get the Open directives for each account.
open_close_map = getters.get_account_open_close(entries)
for entry in entries:
if isinstance(entry, Transaction):
# For each of the postings' accounts, update the balance inventory.
for posting in entry.postings:
real_account = realization.get(real_root, posting.account)
# The account will have been created only if we're meant to track it.
if real_account is not None:
# Note: Always allow negative lots for the purpose of balancing.
# This error should show up somewhere else than here.
real_account.balance.add_position(posting)
elif isinstance(entry, Balance):
# Check that the currency of the balance check is one of the allowed
# currencies for that account.
expected_amount = entry.amount
try:
open, _ = open_close_map[entry.account]
except KeyError:
check_errors.append(
BalanceError(entry.meta,
"Invalid reference to unknown account '{}'".format(
entry.account), entry))
continue
if (expected_amount is not None and
open and open.currencies and
expected_amount.currency not in open.currencies):
check_errors.append(
BalanceError(entry.meta,
"Invalid currency '{}' for Balance directive: ".format(
expected_amount.currency),
entry))
# Sum up the current balances for this account and its
# sub-accounts. We want to support checks for parent accounts
# for the total sum of their subaccounts.
#
# FIXME: Improve the performance further by computing the balance
# for the desired currency only. This won't allow us to cache in
# this way but may be faster, if we're not asserting all the
# currencies. Furthermore, we could probably avoid recomputing the
# balance if a subtree of positions hasn't been invalidated by a new
# position added to the realization. Do this.
real_account = realization.get(real_root, entry.account)
assert real_account is not None, "Missing {}".format(entry.account)
subtree_balance = realization.compute_balance(real_account, leaf_only=False)
# Get only the amount in the desired currency.
balance_amount = subtree_balance.get_currency_units(expected_amount.currency)
# Check if the amount is within bounds of the expected amount.
diff_amount = amount.sub(balance_amount, expected_amount)
# Use the specified tolerance or automatically infer it.
tolerance = get_balance_tolerance(entry, options_map)
if abs(diff_amount.number) > tolerance:
check_errors.append(
BalanceError(entry.meta,
("Balance failed for '{}': "
"expected {} != accumulated {} ({} {})").format(
entry.account, expected_amount, balance_amount,
abs(diff_amount.number),
('too much'
if diff_amount.number > 0
else 'too little')),
entry))
# Substitute the entry by a failing entry, with the diff_amount
# field set on it. I'm not entirely sure that this is the best
# of ideas, maybe leaving the original check intact and insert a
# new error entry might be more functional or easier to
# understand.
entry = entry._replace(
meta=entry.meta.copy(),
diff_amount=diff_amount)
new_entries.append(entry)
return new_entries, check_errors
beancount.ops.balance.get_balance_tolerance(balance_entry, options_map)
Get the tolerance amount for a single entry.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/balance.py
def get_balance_tolerance(balance_entry, options_map):
"""Get the tolerance amount for a single entry.
Args:
balance_entry: An instance of data.Balance
options_map: An options dict, as per the parser.
Returns:
A Decimal, the amount of tolerance implied by the directive.
"""
if balance_entry.tolerance is not None:
# Use the balance-specific tolerance override if it is provided.
tolerance = balance_entry.tolerance
else:
expo = balance_entry.amount.number.as_tuple().exponent
if expo < 0:
# Be generous and always allow twice the multiplier on Balance and
# Pad because the user creates these and the rounding of those
# balances may often be further off than those used within a single
# transaction.
tolerance = options_map["inferred_tolerance_multiplier"] * 2
tolerance = ONE.scaleb(expo) * tolerance
else:
tolerance = ZERO
return tolerance
beancount.ops.basicops
Basic filtering and aggregation operations on lists of entries.
This module contains some common basic operations on entries that are complex enough not to belong in core/data.py.
beancount.ops.basicops.filter_link(link, entries)
Yield all the entries which have the given link.
Parameters: |
|
---|
Yields: Every entry in 'entries' that links to 'link.
Source code in beancount/ops/basicops.py
def filter_link(link, entries):
"""Yield all the entries which have the given link.
Args:
link: A string, the link we are interested in.
Yields:
Every entry in 'entries' that links to 'link.
"""
for entry in entries:
if (isinstance(entry, data.Transaction) and
entry.links and link in entry.links):
yield entry
beancount.ops.basicops.filter_tag(tag, entries)
Yield all the entries which have the given tag.
Parameters: |
|
---|
Yields: Every entry in 'entries' that tags to 'tag.
Source code in beancount/ops/basicops.py
def filter_tag(tag, entries):
"""Yield all the entries which have the given tag.
Args:
tag: A string, the tag we are interested in.
Yields:
Every entry in 'entries' that tags to 'tag.
"""
for entry in entries:
if (isinstance(entry, data.Transaction) and
entry.tags and
tag in entry.tags):
yield entry
beancount.ops.basicops.get_common_accounts(entries)
Compute the intersection of the accounts on the given entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/basicops.py
def get_common_accounts(entries):
"""Compute the intersection of the accounts on the given entries.
Args:
entries: A list of Transaction entries to process.
Returns:
A set of strings, the names of the common accounts from these
entries.
"""
assert all(isinstance(entry, data.Transaction) for entry in entries)
# If there is a single entry, the common accounts to it is all its accounts.
# Note that this also works with no entries (yields an empty set).
if len(entries) < 2:
if entries:
intersection = {posting.account for posting in entries[0].postings}
else:
intersection = set()
else:
entries_iter = iter(entries)
intersection = set(posting.account for posting in next(entries_iter).postings)
for entry in entries_iter:
accounts = set(posting.account for posting in entry.postings)
intersection &= accounts
if not intersection:
break
return intersection
beancount.ops.basicops.group_entries_by_link(entries)
Group the list of entries by link.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/basicops.py
def group_entries_by_link(entries):
"""Group the list of entries by link.
Args:
entries: A list of directives/transactions to process.
Returns:
A dict of link-name to list of entries.
"""
link_groups = defaultdict(list)
for entry in entries:
if not (isinstance(entry, data.Transaction) and entry.links):
continue
for link in entry.links:
link_groups[link].append(entry)
return link_groups
beancount.ops.compress
Compress multiple entries into a single one.
This can be used during import to compress the effective output, for accounts with a large number of similar entries. For example, I had a trading account which would pay out interest every single day. I have no desire to import the full detail of these daily interests, and compressing these interest-only entries to monthly ones made sense. This is the code that was used to carry this out.
beancount.ops.compress.compress(entries, predicate)
Compress multiple transactions into single transactions.
Replace consecutive sequences of Transaction entries that fulfill the given predicate by a single entry at the date of the last matching entry. 'predicate' is the function that determines if an entry should be compressed.
This can be used to simply a list of transactions that are similar and occur frequently. As an example, in a retail FOREX trading account, differential interest of very small amounts is paid every day; it is not relevant to look at the full detail of this interest unless there are other transactions. You can use this to compress it into single entries between other types of transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/compress.py
def compress(entries, predicate):
"""Compress multiple transactions into single transactions.
Replace consecutive sequences of Transaction entries that fulfill the given
predicate by a single entry at the date of the last matching entry.
'predicate' is the function that determines if an entry should be
compressed.
This can be used to simply a list of transactions that are similar and occur
frequently. As an example, in a retail FOREX trading account, differential
interest of very small amounts is paid every day; it is not relevant to look
at the full detail of this interest unless there are other transactions. You
can use this to compress it into single entries between other types of
transactions.
Args:
entries: A list of directives.
predicate: A callable which accepts an entry and return true if the entry
is intended to be compressed.
Returns:
A list of directives, with compressible transactions replaced by a summary
equivalent.
"""
new_entries = []
pending = []
for entry in entries:
if isinstance(entry, data.Transaction) and predicate(entry):
# Save for compressing later.
pending.append(entry)
else:
# Compress and output all the pending entries.
if pending:
new_entries.append(merge(pending, pending[-1]))
pending.clear()
# Output the differing entry.
new_entries.append(entry)
if pending:
new_entries.append(merge(pending, pending[-1]))
return new_entries
beancount.ops.compress.merge(entries, prototype_txn)
Merge the postings of a list of Transactions into a single one.
Merge postings the given entries into a single entry with the Transaction attributes of the prototype. Return the new entry. The combined list of postings are merged if everything about the postings is the same except the number.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/compress.py
def merge(entries, prototype_txn):
"""Merge the postings of a list of Transactions into a single one.
Merge postings the given entries into a single entry with the Transaction
attributes of the prototype. Return the new entry. The combined list of
postings are merged if everything about the postings is the same except the
number.
Args:
entries: A list of directives.
prototype_txn: A Transaction which is used to create the compressed
Transaction instance. Its list of postings is ignored.
Returns:
A new Transaction instance which contains all the postings from the input
entries merged together.
"""
# Aggregate the postings together. This is a mapping of numberless postings
# to their number of units.
postings_map = collections.defaultdict(Decimal)
for entry in data.filter_txns(entries):
for posting in entry.postings:
# We strip the number off the posting to act as an aggregation key.
key = data.Posting(posting.account,
Amount(None, posting.units.currency),
posting.cost,
posting.price,
posting.flag,
None)
postings_map[key] += posting.units.number
# Create a new transaction with the aggregated postings.
new_entry = data.Transaction(prototype_txn.meta,
prototype_txn.date,
prototype_txn.flag,
prototype_txn.payee,
prototype_txn.narration,
data.EMPTY_SET, data.EMPTY_SET, [])
# Sort for at least some stability of output.
sorted_items = sorted(postings_map.items(),
key=lambda item: (item[0].account,
item[0].units.currency,
item[1]))
# Issue the merged postings.
for posting, number in sorted_items:
units = Amount(number, posting.units.currency)
new_entry.postings.append(
data.Posting(posting.account, units, posting.cost, posting.price,
posting.flag, posting.meta))
return new_entry
beancount.ops.documents
Everything that relates to creating the Document directives.
beancount.ops.documents.DocumentError (tuple)
DocumentError(source, message, entry)
beancount.ops.documents.DocumentError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/documents.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.documents.DocumentError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of DocumentError(source, message, entry)
beancount.ops.documents.DocumentError.__replace__(/, self, **kwds)
special
Return a new DocumentError object replacing specified fields with new values
Source code in beancount/ops/documents.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.ops.documents.DocumentError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/documents.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.documents.find_documents(directory, input_filename, accounts_only=None, strict=False)
Find dated document files under the given directory.
If a restricting set of accounts is provided in 'accounts_only', only return entries that correspond to one of the given accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def find_documents(directory, input_filename, accounts_only=None, strict=False):
"""Find dated document files under the given directory.
If a restricting set of accounts is provided in 'accounts_only', only return
entries that correspond to one of the given accounts.
Args:
directory: A string, the name of the root of the directory hierarchy to be searched.
input_filename: The name of the file to be used for the Document directives. This is
also used to resolve relative directory names.
accounts_only: A set of valid accounts strings to search for.
strict: A boolean, set to true if you want to generate errors on documents
found in accounts not provided in accounts_only. This is only meaningful
if accounts_only is specified.
Returns:
A list of new Document objects that were created from the files found, and a list
of new errors generated.
"""
errors = []
# Compute the documents directory name relative to the beancount input
# file itself.
if not path.isabs(directory):
input_directory = path.dirname(input_filename)
directory = path.abspath(path.normpath(path.join(input_directory,
directory)))
# If the directory does not exist, just generate an error and return.
if not path.exists(directory):
meta = data.new_metadata(input_filename, 0)
error = DocumentError(
meta, "Document root '{}' does not exist".format(directory), None)
return ([], [error])
# Walk the hierarchy of files.
entries = []
for root, account_name, dirs, files in account.walk(directory):
# Look for files that have a dated filename.
for filename in files:
match = re.match(r'(\d\d\d\d)-(\d\d)-(\d\d).(.*)', filename)
if not match:
continue
# If a restricting set of accounts was specified, skip document
# directives found in accounts with no corresponding account name.
if accounts_only is not None and not account_name in accounts_only:
if strict:
if any(account_name.startswith(account) for account in accounts_only):
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Document '{}' found in child account {}".format(
filename, account_name), None))
elif any(account.startswith(account_name) for account in accounts_only):
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Document '{}' found in parent account {}".format(
filename, account_name), None))
continue
# Create a new directive.
meta = data.new_metadata(input_filename, 0)
try:
date = datetime.date(*map(int, match.group(1, 2, 3)))
except ValueError as exc:
errors.append(DocumentError(
data.new_metadata(input_filename, 0),
"Invalid date on document file '{}': {}".format(
filename, exc), None))
else:
entry = data.Document(meta, date, account_name, path.join(root, filename),
data.EMPTY_SET, data.EMPTY_SET)
entries.append(entry)
return (entries, errors)
beancount.ops.documents.process_documents(entries, options_map)
Check files for document directives and create documents directives automatically.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def process_documents(entries, options_map):
"""Check files for document directives and create documents directives automatically.
Args:
entries: A list of all directives parsed from the file.
options_map: An options dict, as is output by the parser.
We're using its 'filename' option to figure out relative path to
search for documents.
Returns:
A pair of list of all entries (including new ones), and errors
generated during the process of creating document directives.
"""
filename = options_map["filename"]
# Detect filenames that should convert into entries.
autodoc_entries = []
autodoc_errors = []
document_dirs = options_map['documents']
if document_dirs:
# Restrict to the list of valid accounts only.
accounts = getters.get_accounts(entries)
# Accumulate all the entries.
for directory in map(path.normpath, document_dirs):
new_entries, new_errors = find_documents(directory, filename, accounts)
autodoc_entries.extend(new_entries)
autodoc_errors.extend(new_errors)
# Merge the two lists of entries and errors. Keep the entries sorted.
entries.extend(autodoc_entries)
entries.sort(key=data.entry_sortkey)
return (entries, autodoc_errors)
beancount.ops.documents.verify_document_files_exist(entries, unused_options_map)
Verify that the document entries point to existing files.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/documents.py
def verify_document_files_exist(entries, unused_options_map):
"""Verify that the document entries point to existing files.
Args:
entries: a list of directives whose documents need to be validated.
unused_options_map: A parser options dict. We're not using it.
Returns:
The same list of entries, and a list of new errors, if any were encountered.
"""
errors = []
for entry in entries:
if not isinstance(entry, data.Document):
continue
if not path.exists(entry.filename):
errors.append(
DocumentError(entry.meta,
'File does not exist: "{}"'.format(entry.filename),
entry))
return entries, errors
beancount.ops.find_prices
A library of codes create price fetching jobs from strings and files.
beancount.ops.find_prices.find_balance_currencies(entries, date=None)
Return currencies relevant for the given date.
This computes the account balances as of the date, and returns the union of: a) The currencies held at cost, and b) Currency pairs from previous conversions, but only for currencies with non-zero balances.
This is intended to produce the list of currencies whose prices are relevant at a particular date, based on previous history.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/find_prices.py
def find_balance_currencies(entries, date=None):
"""Return currencies relevant for the given date.
This computes the account balances as of the date, and returns the union of:
a) The currencies held at cost, and
b) Currency pairs from previous conversions, but only for currencies with
non-zero balances.
This is intended to produce the list of currencies whose prices are relevant
at a particular date, based on previous history.
Args:
entries: A list of directives.
date: A datetime.date instance.
Returns:
A set of (base, quote) currencies.
"""
# Compute the balances.
currencies = set()
currencies_on_books = set()
balances, _ = summarize.balance_by_account(entries, date)
for _, balance in balances.items():
for pos in balance:
if pos.cost is not None:
# Add currencies held at cost.
currencies.add((pos.units.currency, pos.cost.currency))
else:
# Add regular currencies.
currencies_on_books.add(pos.units.currency)
# Create currency pairs from the currencies which are on account balances.
# In order to figure out the quote currencies, we use the list of price
# conversions until this date.
converted = (find_currencies_converted(entries, date) |
find_currencies_priced(entries, date))
for cbase in currencies_on_books:
for base_quote in converted:
base, quote = base_quote
if base == cbase:
currencies.add(base_quote)
return currencies
beancount.ops.find_prices.find_currencies_at_cost(entries, date=None)
Return all currencies that were held at cost at some point.
This returns all of them, even if not on the books at a particular point in time. This code does not look at account balances.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/find_prices.py
def find_currencies_at_cost(entries, date=None):
"""Return all currencies that were held at cost at some point.
This returns all of them, even if not on the books at a particular point in
time. This code does not look at account balances.
Args:
entries: A list of directives.
date: A datetime.date instance.
Returns:
A list of (base, quote) currencies.
"""
currencies = set()
for entry in entries:
if not isinstance(entry, data.Transaction):
continue
if date and entry.date >= date:
break
for posting in entry.postings:
if posting.cost is not None and posting.cost.number is not None:
currencies.add((posting.units.currency, posting.cost.currency))
return currencies
beancount.ops.find_prices.find_currencies_converted(entries, date=None)
Return currencies from price conversions.
This function looks at all price conversions that occurred until some date and produces a list of them. Note: This does not include Price directives, only postings with price conversions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/find_prices.py
def find_currencies_converted(entries, date=None):
"""Return currencies from price conversions.
This function looks at all price conversions that occurred until some date
and produces a list of them. Note: This does not include Price directives,
only postings with price conversions.
Args:
entries: A list of directives.
date: A datetime.date instance.
Returns:
A list of (base, quote) currencies.
"""
currencies = set()
for entry in entries:
if not isinstance(entry, data.Transaction):
continue
if date and entry.date >= date:
break
for posting in entry.postings:
price = posting.price
if posting.cost is not None or price is None:
continue
currencies.add((posting.units.currency, price.currency))
return currencies
beancount.ops.find_prices.find_currencies_priced(entries, date=None)
Return currencies seen in Price directives.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/find_prices.py
def find_currencies_priced(entries, date=None):
"""Return currencies seen in Price directives.
Args:
entries: A list of directives.
date: A datetime.date instance.
Returns:
A list of (base, quote) currencies.
"""
currencies = set()
for entry in entries:
if not isinstance(entry, data.Price):
continue
if date and entry.date >= date:
break
currencies.add((entry.currency, entry.amount.currency))
return currencies
beancount.ops.lifetimes
Given a Beancount ledger, compute time intervals where we hold each commodity.
This script computes, for each commodity, which time intervals it is required at. This can then be used to identify a list of dates at which we need to fetch prices in order to properly fill the price database.
beancount.ops.lifetimes.compress_intervals_days(intervals, num_days)
Compress a list of date pairs to ignore short stretches of unused days.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def compress_intervals_days(intervals, num_days):
"""Compress a list of date pairs to ignore short stretches of unused days.
Args:
intervals: A list of pairs of datetime.date instances.
num_days: An integer, the number of unused days to require for intervals
to be distinct, to allow a gap.
Returns:
A new dict of lifetimes map where some intervals may have been joined.
"""
ignore_interval = datetime.timedelta(days=num_days)
new_intervals = []
iter_intervals = iter(intervals)
last_begin, last_end = next(iter_intervals)
for date_begin, date_end in iter_intervals:
if date_begin - last_end < ignore_interval:
# Compress.
last_end = date_end
continue
new_intervals.append((last_begin, last_end))
last_begin, last_end = date_begin, date_end
new_intervals.append((last_begin, last_end))
return new_intervals
beancount.ops.lifetimes.compress_lifetimes_days(lifetimes_map, num_days)
Compress a lifetimes map to ignore short stretches of unused days.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def compress_lifetimes_days(lifetimes_map, num_days):
"""Compress a lifetimes map to ignore short stretches of unused days.
Args:
lifetimes_map: A dict of currency intervals as returned by get_commodity_lifetimes.
num_days: An integer, the number of unused days to ignore.
Returns:
A new dict of lifetimes map where some intervals may have been joined.
"""
return {currency_pair: compress_intervals_days(intervals, num_days)
for currency_pair, intervals in lifetimes_map.items()}
beancount.ops.lifetimes.get_commodity_lifetimes(entries)
Given a list of directives, figure out the life of each commodity.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def get_commodity_lifetimes(entries):
"""Given a list of directives, figure out the life of each commodity.
Args:
entries: A list of directives.
Returns:
A dict of (currency, cost-currency) commodity strings to lists of (start,
end) datetime.date pairs. The dates are inclusive of the day the commodity
was seen; the end/last dates are one day _after_ the last date seen.
"""
lifetimes = collections.defaultdict(list)
# The current set of active commodities.
commodities = set()
# The current balances across all accounts.
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
# Process only transaction entries.
if not isinstance(entry, data.Transaction):
continue
# Update the balance of affected accounts and check locally whether that
# triggered a change in the set of commodities.
commodities_changed = False
for posting in entry.postings:
balance = balances[posting.account]
commodities_before = balance.currency_pairs()
balance.add_position(posting)
commodities_after = balance.currency_pairs()
if commodities_after != commodities_before:
commodities_changed = True
# If there was a change in one of the affected account's list of
# commodities, recompute the total set globally. This should not
# occur very frequently.
if commodities_changed:
new_commodities = set(
itertools.chain(*(inv.currency_pairs() for inv in balances.values())))
if new_commodities != commodities:
# The new global set of commodities has changed; update our
# the dictionary of intervals.
for currency in new_commodities - commodities:
lifetimes[currency].append((entry.date, None))
for currency in commodities - new_commodities:
lifetime = lifetimes[currency]
begin_date, end_date = lifetime.pop(-1)
assert end_date is None
lifetime.append((begin_date, entry.date + ONEDAY))
# Update our current set.
commodities = new_commodities
return lifetimes
beancount.ops.lifetimes.required_daily_prices(lifetimes_map, date_last, weekdays_only=False)
Enumerate all the commodities and days where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the days for each commodity where it is active. This can be used to connect to a historical price fetcher routine to fill in missing price entries from an existing ledger.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def required_daily_prices(lifetimes_map, date_last, weekdays_only=False):
"""Enumerate all the commodities and days where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the days
for each commodity where it is active. This can be used to connect to a
historical price fetcher routine to fill in missing price entries from an
existing ledger.
Args:
lifetimes_map: A dict of currency to active intervals as returned by
get_commodity_lifetimes().
date_last: A datetime.date instance, the last date which we're interested in.
weekdays_only: Option to limit fetching to weekdays only.
Returns:
Tuples of (date, currency, cost-currency).
"""
results = []
for currency_pair, intervals in lifetimes_map.items():
if currency_pair[1] is None:
continue
for date_begin, date_end in intervals:
# Find first Weekday starting on or before minimum date.
date = date_begin
if(weekdays_only):
diff_days = 4 - date_begin.weekday()
if diff_days < 0:
date += datetime.timedelta(days=diff_days)
# Iterate over all weekdays.
if date_end is None:
date_end = date_last
while date < date_end:
results.append((date, currency_pair[0], currency_pair[1]))
if weekdays_only and date.weekday() == 4:
date += 3 * ONEDAY
else:
date += ONEDAY
return sorted(results)
beancount.ops.lifetimes.required_weekly_prices(lifetimes_map, date_last)
Enumerate all the commodities and Fridays where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the Fridays for each commodity where it is active. This can be used to connect to a historical price fetcher routine to fill in missing price entries from an existing ledger.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def required_weekly_prices(lifetimes_map, date_last):
"""Enumerate all the commodities and Fridays where the price is required.
Given a map of lifetimes for a set of commodities, enumerate all the Fridays
for each commodity where it is active. This can be used to connect to a
historical price fetcher routine to fill in missing price entries from an
existing ledger.
Args:
lifetimes_map: A dict of currency to active intervals as returned by
get_commodity_lifetimes().
date_last: A datetime.date instance, the last date which we're interested in.
Returns:
Tuples of (date, currency, cost-currency).
"""
results = []
for currency_pair, intervals in lifetimes_map.items():
if currency_pair[1] is None:
continue
for date_begin, date_end in intervals:
# Find first Friday before the minimum date.
diff_days = 4 - date_begin.weekday()
if diff_days >= 1:
diff_days -= 7
date = date_begin + datetime.timedelta(days=diff_days)
# Iterate over all Fridays.
if date_end is None:
date_end = date_last
while date < date_end:
results.append((date, currency_pair[0], currency_pair[1]))
date += ONE_WEEK
return sorted(results)
beancount.ops.lifetimes.trim_intervals(intervals, trim_start=None, trim_end=None)
Trim a list of date pairs to be within a start and end date. Useful in update-style price fetching.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/lifetimes.py
def trim_intervals(intervals, trim_start=None, trim_end=None):
"""Trim a list of date pairs to be within a start and end date.
Useful in update-style price fetching.
Args:
intervals: A list of pairs of datetime.date instances
trim_start: An inclusive starting date.
trim_end: An exclusive starting date.
Returns:
A list of new intervals (pairs of (date, date)).
"""
new_intervals = []
iter_intervals = iter(intervals)
if(trim_start is not None and
trim_end is not None and
trim_end < trim_start):
raise ValueError('Trim end date is before start date')
for date_begin, date_end in iter_intervals:
if(trim_start is not None and
trim_start > date_begin):
date_begin = trim_start
if(trim_end is not None):
if(date_end is None or
trim_end < date_end):
date_end = trim_end
if(date_end is None or
date_begin <= date_end):
new_intervals.append((date_begin, date_end))
return new_intervals
beancount.ops.pad
Automatic padding of gaps between entries.
beancount.ops.pad.PadError (tuple)
PadError(source, message, entry)
beancount.ops.pad.PadError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/pad.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.pad.PadError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of PadError(source, message, entry)
beancount.ops.pad.PadError.__replace__(/, self, **kwds)
special
Return a new PadError object replacing specified fields with new values
Source code in beancount/ops/pad.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.ops.pad.PadError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/pad.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.pad.pad(entries, options_map)
Insert transaction entries for to fulfill a subsequent balance check.
Synthesize and insert Transaction entries right after Pad entries in order to fulfill checks in the padded accounts. Returns a new list of entries. Note that this doesn't pad across parent-child relationships, it is a very simple kind of pad. (I have found this to be sufficient in practice, and simpler to implement and understand.)
Furthermore, this pads for a single currency only, that is, balance checks are specified only for one currency at a time, and pads will only be inserted for those currencies.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/pad.py
def pad(entries, options_map):
"""Insert transaction entries for to fulfill a subsequent balance check.
Synthesize and insert Transaction entries right after Pad entries in order
to fulfill checks in the padded accounts. Returns a new list of entries.
Note that this doesn't pad across parent-child relationships, it is a very
simple kind of pad. (I have found this to be sufficient in practice, and
simpler to implement and understand.)
Furthermore, this pads for a single currency only, that is, balance checks
are specified only for one currency at a time, and pads will only be
inserted for those currencies.
Args:
entries: A list of directives.
options_map: A parser options dict.
Returns:
A new list of directives, with Pad entries inserted, and a list of new
errors produced.
"""
pad_errors = []
# Find all the pad entries and group them by account.
pads = list(misc_utils.filter_type(entries, data.Pad))
pad_dict = misc_utils.groupby(lambda x: x.account, pads)
# Partially realize the postings, so we can iterate them by account.
by_account = realization.postings_by_account(entries)
# A dict of pad -> list of entries to be inserted.
new_entries = {id(pad): [] for pad in pads}
# Process each account that has a padding group.
for account_, pad_list in sorted(pad_dict.items()):
# Last encountered / currency active pad entry.
active_pad = None
# Gather all the postings for the account and its children.
postings = []
is_child = account.parent_matcher(account_)
for item_account, item_postings in by_account.items():
if is_child(item_account):
postings.extend(item_postings)
postings.sort(key=data.posting_sortkey)
# A set of currencies already padded so far in this account.
padded_lots = set()
pad_balance = inventory.Inventory()
for entry in postings:
assert not isinstance(entry, data.Posting)
if isinstance(entry, data.TxnPosting):
# This is a transaction; update the running balance for this
# account.
pad_balance.add_position(entry.posting)
elif isinstance(entry, data.Pad):
if entry.account == account_:
# Mark this newly encountered pad as active and allow all lots
# to be padded heretofore.
active_pad = entry
padded_lots = set()
elif isinstance(entry, data.Balance):
check_amount = entry.amount
# Compare the current balance amount to the expected one from
# the check entry. IMPORTANT: You need to understand that this
# does not check a single position, but rather checks that the
# total amount for a particular currency (which itself is
# distinct from the cost).
balance_amount = pad_balance.get_currency_units(check_amount.currency)
diff_amount = amount.sub(balance_amount, check_amount)
# Use the specified tolerance or automatically infer it.
tolerance = balance.get_balance_tolerance(entry, options_map)
if abs(diff_amount.number) > tolerance:
# The check fails; we need to pad.
# Pad only if pad entry is active and we haven't already
# padded that lot since it was last encountered.
if active_pad and (check_amount.currency not in padded_lots):
# Note: we decide that it's an error to try to pad
# positions at cost; we check here that all the existing
# positions with that currency have no cost.
positions = [pos
for pos in pad_balance.get_positions()
if pos.units.currency == check_amount.currency]
for position_ in positions:
if position_.cost is not None:
pad_errors.append(
PadError(entry.meta,
("Attempt to pad an entry with cost for "
"balance: {}".format(pad_balance)),
active_pad))
# Thus our padding lot is without cost by default.
diff_position = position.Position.from_amounts(
amount.Amount(check_amount.number - balance_amount.number,
check_amount.currency))
# Synthesize a new transaction entry for the difference.
narration = ('(Padding inserted for Balance of {} for '
'difference {})').format(check_amount, diff_position)
new_entry = data.Transaction(
active_pad.meta.copy(), active_pad.date, flags.FLAG_PADDING,
None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
new_entry.postings.append(
data.Posting(active_pad.account,
diff_position.units, diff_position.cost,
None, None, {}))
neg_diff_position = -diff_position
new_entry.postings.append(
data.Posting(active_pad.source_account,
neg_diff_position.units, neg_diff_position.cost,
None, None, {}))
# Save it for later insertion after the active pad.
new_entries[id(active_pad)].append(new_entry)
# Fixup the running balance.
pos, _ = pad_balance.add_position(diff_position)
if pos is not None and pos.is_negative_at_cost():
raise ValueError(
"Position held at cost goes negative: {}".format(pos))
# Mark this lot as padded. Further checks should not pad this lot.
padded_lots.add(check_amount.currency)
# Insert the newly created entries right after the pad entries that created them.
padded_entries = []
for entry in entries:
padded_entries.append(entry)
if isinstance(entry, data.Pad):
entry_list = new_entries[id(entry)]
if entry_list:
padded_entries.extend(entry_list)
else:
# Generate errors on unused pad entries.
pad_errors.append(
PadError(entry.meta, "Unused Pad entry", entry))
return padded_entries, pad_errors
beancount.ops.summarize
Summarization of entries.
This code is used to summarize a sequence of entries (e.g. during a time period) into a few "opening balance" entries. This is when computing a balance sheet for a specific time period: we don't want to see the entries from before some period of time, so we fold them into a single transaction per account that has the sum total amount of that account.
beancount.ops.summarize.balance_by_account(entries, date=None, compress_unbooked=False)
Sum up the balance per account for all entries strictly before 'date'.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def balance_by_account(entries, date=None, compress_unbooked=False):
"""Sum up the balance per account for all entries strictly before 'date'.
Args:
entries: A list of directives.
date: An optional datetime.date instance. If provided, stop accumulating
on and after this date. This is useful for summarization before a
specific date.
compress_unbooked: For accounts that have a booking method of NONE,
compress their positions into a single average position. This can be
used when you export the full list of positions, because those accounts
will have a myriad of small positions from fees at negative cost and
what-not.
Returns:
A pair of a dict of account string to instance Inventory (the balance of
this account before the given date), and the index in the list of entries
where the date was encountered. If all entries are located before the
cutoff date, an index one beyond the last entry is returned.
"""
balances = collections.defaultdict(inventory.Inventory)
for index, entry in enumerate(entries):
if date and entry.date >= date:
break
if isinstance(entry, Transaction):
for posting in entry.postings:
account_balance = balances[posting.account]
# Note: We must allow negative lots at cost, because this may be
# used to reduce a filtered list of entries which may not
# include the entries necessary to keep units at cost always
# above zero. The only summation that is guaranteed to be above
# zero is if all the entries are being summed together, no
# entries are filtered, at least for a particular account's
# postings.
account_balance.add_position(posting)
else:
index = len(entries)
# If the account has "NONE" booking method, merge all its postings
# together in order to obtain an accurate cost basis and balance of
# units.
#
# (This is a complex issue.) If you accrued positions without having them
# booked properly against existing cost bases, you have not properly accounted
# for the profit/loss to other postings. This means that the resulting
# profit/loss is merged in the cost basis of the positive and negative
# postings.
if compress_unbooked:
oc_map = getters.get_account_open_close(entries)
accounts_map = {account: dopen for account, (dopen, _) in oc_map.items()}
for account, balance in balances.items():
dopen = accounts_map.get(account, None)
if dopen is not None and dopen.booking is data.Booking.NONE:
average_balance = balance.average()
balances[account] = inventory.Inventory(pos for pos in average_balance)
return balances, index
beancount.ops.summarize.cap(entries, account_types, conversion_currency, account_earnings, account_conversions)
Transfer net income to equity and insert a final conversion entry.
This is used to move and nullify balances from the income and expense accounts to an equity account in order to draw up a balance sheet with a balance of precisely zero.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def cap(entries,
account_types,
conversion_currency,
account_earnings,
account_conversions):
"""Transfer net income to equity and insert a final conversion entry.
This is used to move and nullify balances from the income and expense
accounts to an equity account in order to draw up a balance sheet with a
balance of precisely zero.
Args:
entries: A list of directives.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the equity account to transfer
final balances of the income and expense accounts to.
account_conversions: A string, the name of the equity account to use as
the source for currency conversions.
Returns:
A modified list of entries, with the income and expense accounts
transferred.
"""
# Transfer the balances of income and expense accounts as earnings / net
# income.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
entries = transfer_balances(entries, None,
income_statement_account_pred,
account_earnings)
# Insert final conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, None)
return entries
beancount.ops.summarize.cap_opt(entries, options_map)
Close by getting all the parameters from an options map.
See cap() for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def cap_opt(entries, options_map):
"""Close by getting all the parameters from an options map.
See cap() for details.
Args:
entries: See cap().
options_map: A parser's option_map.
Returns:
Same as close().
"""
account_types = options.get_account_types(options_map)
current_accounts = options.get_current_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return cap(entries,
account_types,
conversion_currency,
*current_accounts)
beancount.ops.summarize.clamp(entries, begin_date, end_date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)
Filter entries to include only those during a specified time period.
Firstly, this method will transfer all balances for the income and expense accounts occurring before the given period begin date to the 'account_earnings' account (earnings before the period, or "retained earnings") and summarize all of the transactions before that date against the 'account_opening' account (usually "opening balances"). The resulting income and expense accounts should have no transactions (since their balances have been transferred out and summarization of zero balances should not add any transactions).
Secondly, all the entries after the period end date will be truncated and a conversion entry will be added for the resulting transactions that reflect changes occurring between the beginning and end of the exercise period. The resulting balance of all account should be empty.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clamp(entries,
begin_date, end_date,
account_types,
conversion_currency,
account_earnings,
account_opening,
account_conversions):
"""Filter entries to include only those during a specified time period.
Firstly, this method will transfer all balances for the income and expense
accounts occurring before the given period begin date to the
'account_earnings' account (earnings before the period, or "retained
earnings") and summarize all of the transactions before that date against
the 'account_opening' account (usually "opening balances"). The resulting
income and expense accounts should have no transactions (since their
balances have been transferred out and summarization of zero balances should
not add any transactions).
Secondly, all the entries after the period end date will be truncated and a
conversion entry will be added for the resulting transactions that reflect
changes occurring between the beginning and end of the exercise period. The
resulting balance of all account should be empty.
Args:
entries: A list of directive tuples.
begin_date: A datetime.date instance, the beginning of the period.
end_date: A datetime.date instance, one day beyond the end of the period.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
account_opening: A string, the name of the account in equity
to transfer previous balances from, in order to initialize account
balances at the beginning of the period. This is typically called an
opening balances account.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to the first
original transaction after the beginning date of the period. This index
can be used to generate the opening balances report, which is a balance
sheet fed with only the summarized entries.
"""
# Transfer income and expenses before the period to equity.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
entries = transfer_balances(entries, begin_date,
income_statement_account_pred, account_earnings)
# Summarize all the previous balances, after transferring the income and
# expense balances, so all entries for those accounts before the begin date
# should now disappear.
entries, index = summarize(entries, begin_date, account_opening)
# Truncate the entries after this.
entries = truncate(entries, end_date)
# Insert conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, end_date)
return entries, index
beancount.ops.summarize.clamp_opt(entries, begin_date, end_date, options_map)
Clamp by getting all the parameters from an options map.
See clamp() for details.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clamp_opt(entries, begin_date, end_date, options_map):
"""Clamp by getting all the parameters from an options map.
See clamp() for details.
Args:
entries: See clamp().
begin_date: See clamp().
end_date: See clamp().
options_map: A parser's option_map.
Returns:
Same as clamp().
"""
account_types = options.get_account_types(options_map)
previous_earnings, previous_balances, _ = options.get_previous_accounts(options_map)
_, current_conversions = options.get_current_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return clamp(entries, begin_date, end_date,
account_types,
conversion_currency,
previous_earnings,
previous_balances,
current_conversions)
beancount.ops.summarize.clear(entries, date, account_types, account_earnings)
Transfer income and expenses balances at the given date to the equity accounts.
This method insert entries to zero out balances on income and expenses accounts by transferring them to an equity account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def clear(entries,
date,
account_types,
account_earnings):
"""Transfer income and expenses balances at the given date to the equity accounts.
This method insert entries to zero out balances on income and expenses
accounts by transferring them to an equity account.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, one day beyond the end of the period. This
date can be optionally left to None in order to close at the end of the
list of entries.
account_types: An instance of AccountTypes.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
Returns:
A new list of entries is returned, and the index that points to one before
the last original transaction before the transfers.
"""
index = len(entries)
# Transfer income and expenses before the period to equity.
income_statement_account_pred = (
lambda account: is_income_statement_account(account, account_types))
new_entries = transfer_balances(entries, date,
income_statement_account_pred, account_earnings)
return new_entries, index
beancount.ops.summarize.clear_opt(entries, date, options_map)
Convenience function to clear() using an options map.
Source code in beancount/ops/summarize.py
def clear_opt(entries, date, options_map):
"""Convenience function to clear() using an options map.
"""
account_types = options.get_account_types(options_map)
current_accounts = options.get_current_accounts(options_map)
return clear(entries, date, account_types, current_accounts[0])
beancount.ops.summarize.close(entries, date, conversion_currency, account_conversions)
Truncate entries that occur after a particular date and ensure balance.
This method essentially removes entries after a date. It truncates the future. To do so, it will
-
Remove all entries which occur after 'date', if given.
-
Insert conversion transactions at the end of the list of entries to ensure that the total balance of all postings sums up to empty.
The result is a list of entries with a total balance of zero, with possibly non-zero balances for the income/expense accounts. To produce a final balance sheet, use transfer() to move the net income to the equity accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def close(entries,
date,
conversion_currency,
account_conversions):
"""Truncate entries that occur after a particular date and ensure balance.
This method essentially removes entries after a date. It truncates the
future. To do so, it will
1. Remove all entries which occur after 'date', if given.
2. Insert conversion transactions at the end of the list of entries to
ensure that the total balance of all postings sums up to empty.
The result is a list of entries with a total balance of zero, with possibly
non-zero balances for the income/expense accounts. To produce a final
balance sheet, use transfer() to move the net income to the equity accounts.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, one day beyond the end of the period. This
date can be optionally left to None in order to close at the end of the
list of entries.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to one beyond
the last original transaction that was provided. Further entries may have
been inserted to normalize conversions and ensure the total balance sums
to zero.
"""
# Truncate the entries after the date, if a date has been provided.
if date is not None:
entries = truncate(entries, date)
# Keep an index to the truncated list of entries (before conversions).
index = len(entries)
# Insert a conversions entry to ensure the total balance of all accounts is
# flush zero.
entries = conversions(entries, account_conversions, conversion_currency, date)
return entries, index
beancount.ops.summarize.close_opt(entries, date, options_map)
Convenience function to close() using an options map.
Source code in beancount/ops/summarize.py
def close_opt(entries, date, options_map):
"""Convenience function to close() using an options map.
"""
conversion_currency = options_map['conversion_currency']
current_accounts = options.get_current_accounts(options_map)
return close(entries, date, conversion_currency, current_accounts[1])
beancount.ops.summarize.conversions(entries, conversion_account, conversion_currency, date=None)
Insert a conversion entry at date 'date' at the given account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def conversions(entries, conversion_account, conversion_currency, date=None):
"""Insert a conversion entry at date 'date' at the given account.
Args:
entries: A list of entries.
conversion_account: A string, the account to book against.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
date: The date before which to insert the conversion entry. The new
entry will be inserted as the last entry of the date just previous
to this date.
Returns:
A modified list of entries.
"""
# Compute the balance at the given date.
conversion_balance = interpolate.compute_entries_balance(entries, date=date)
# Early exit if there is nothing to do.
conversion_cost_balance = conversion_balance.reduce(convert.get_cost)
if conversion_cost_balance.is_empty():
return entries
# Calculate the index and the date for the new entry. We want to store it as
# the last transaction of the day before.
if date is not None:
index = bisect_key.bisect_left_with_key(entries, date, key=lambda entry: entry.date)
last_date = date - datetime.timedelta(days=1)
else:
index = len(entries)
last_date = entries[-1].date
meta = data.new_metadata('<conversions>', -1)
narration = 'Conversion for {}'.format(conversion_balance)
conversion_entry = Transaction(meta, last_date, flags.FLAG_CONVERSIONS,
None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
for position in conversion_cost_balance.get_positions():
# Important note: Set the cost to zero here to maintain the balance
# invariant. (This is the only single place we cheat on the balance rule
# in the entire system and this is necessary; see documentation on
# Conversions.)
price = amount.Amount(ZERO, conversion_currency)
neg_pos = -position
conversion_entry.postings.append(
data.Posting(conversion_account, neg_pos.units, neg_pos.cost,
price, None, None))
# Make a copy of the list of entries and insert the new transaction into it.
new_entries = list(entries)
new_entries.insert(index, conversion_entry)
return new_entries
beancount.ops.summarize.create_entries_from_balances(balances, date, source_account, direction, meta, flag, narration_template)
"Create a list of entries from a dict of balances.
This method creates a list of new entries to transfer the amounts in the 'balances' dict to/from another account specified in 'source_account'.
The balancing posting is created with the equivalent at cost. In other words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a posting with this position on one leg, and with 5000 USD on the 'source_account' leg.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def create_entries_from_balances(balances, date, source_account, direction,
meta, flag, narration_template):
""""Create a list of entries from a dict of balances.
This method creates a list of new entries to transfer the amounts in the
'balances' dict to/from another account specified in 'source_account'.
The balancing posting is created with the equivalent at cost. In other
words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a
posting with this position on one leg, and with 5000 USD on the
'source_account' leg.
Args:
balances: A dict of account name strings to Inventory instances.
date: A datetime.date object, the date at which to create the transaction.
source_account: A string, the name of the account to pull the balances
from. This is the magician's hat to pull the rabbit from.
direction: If 'direction' is True, the new entries transfer TO the
balances account from the source account; otherwise the new entries
transfer FROM the balances into the source account.
meta: A dict to use as metadata for the transactions.
flag: A string, the flag to use for the transactions.
narration_template: A format string for creating the narration. It is
formatted with 'account' and 'date' replacement variables.
Returns:
A list of newly synthesizes Transaction entries.
"""
new_entries = []
for account, account_balance in sorted(balances.items()):
# Don't create new entries where there is no balance.
if account_balance.is_empty():
continue
narration = narration_template.format(account=account, date=date)
if not direction:
account_balance = -account_balance
postings = []
new_entry = Transaction(
meta, date, flag, None, narration, data.EMPTY_SET, data.EMPTY_SET, postings)
for position in account_balance.get_positions():
postings.append(data.Posting(account, position.units, position.cost,
None, None, None))
cost = -convert.get_cost(position)
postings.append(data.Posting(source_account, cost, None,
None, None, None))
new_entries.append(new_entry)
return new_entries
beancount.ops.summarize.get_open_entries(entries, date)
Gather the list of active Open entries at date.
This returns the list of Open entries that have not been closed at the given date, in the same order they were observed in the document.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def get_open_entries(entries, date):
"""Gather the list of active Open entries at date.
This returns the list of Open entries that have not been closed at the given
date, in the same order they were observed in the document.
Args:
entries: A list of directives.
date: The date at which to look for an open entry. If not specified, will
return the entries still open at the latest date.
Returns:
A list of Open directives.
"""
open_entries = {}
for index, entry in enumerate(entries):
if date is not None and entry.date >= date:
break
if isinstance(entry, Open):
try:
ex_index, ex_entry = open_entries[entry.account]
if entry.date < ex_entry.date:
open_entries[entry.account] = (index, entry)
except KeyError:
open_entries[entry.account] = (index, entry)
elif isinstance(entry, Close):
# If there is no corresponding open, don't raise an error.
open_entries.pop(entry.account, None)
return [entry for (index, entry) in sorted(open_entries.values())]
beancount.ops.summarize.open(entries, date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)
Summarize entries before a date and transfer income/expenses to equity.
This method essentially prepares a list of directives to contain only transactions that occur after a particular date. It truncates the past. To do so, it will
-
Insert conversion transactions at the given open date, then
-
Insert transactions at that date to move accumulated balances from before that date from the income and expenses accounts to an equity account, and finally
-
It removes all the transactions previous to the date and replaces them by opening balances entries to bring the balances to the same amount.
The result is a list of entries for which the income and expense accounts are beginning with a balance of zero, and all other accounts begin with a transaction that brings their balance to the expected amount. All the past has been summarized at that point.
An index is returned to the first transaction past the balance opening transactions, so you can keep just those in order to render a balance sheet for only the opening balances.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def open(entries,
date,
account_types,
conversion_currency,
account_earnings,
account_opening,
account_conversions):
"""Summarize entries before a date and transfer income/expenses to equity.
This method essentially prepares a list of directives to contain only
transactions that occur after a particular date. It truncates the past. To
do so, it will
1. Insert conversion transactions at the given open date, then
2. Insert transactions at that date to move accumulated balances from before
that date from the income and expenses accounts to an equity account, and
finally
3. It removes all the transactions previous to the date and replaces them by
opening balances entries to bring the balances to the same amount.
The result is a list of entries for which the income and expense accounts
are beginning with a balance of zero, and all other accounts begin with a
transaction that brings their balance to the expected amount. All the past
has been summarized at that point.
An index is returned to the first transaction past the balance opening
transactions, so you can keep just those in order to render a balance sheet
for only the opening balances.
Args:
entries: A list of directive tuples.
date: A datetime.date instance, the date at which to do this.
account_types: An instance of AccountTypes.
conversion_currency: A string, the transfer currency to use for zero prices
on the conversion entry.
account_earnings: A string, the name of the account to transfer
previous earnings from the income statement accounts to the balance
sheet.
account_opening: A string, the name of the account in equity
to transfer previous balances from, in order to initialize account
balances at the beginning of the period. This is typically called an
opening balances account.
account_conversions: A string, the name of the equity account to
book currency conversions against.
Returns:
A new list of entries is returned, and the index that points to the first
original transaction after the beginning date of the period. This index
can be used to generate the opening balances report, which is a balance
sheet fed with only the summarized entries.
"""
# Insert conversion entries.
entries = conversions(entries, account_conversions, conversion_currency, date)
# Transfer income and expenses before the period to equity.
entries, _ = clear(entries, date, account_types, account_earnings)
# Summarize all the previous balances, after transferring the income and
# expense balances, so all entries for those accounts before the begin date
# should now disappear.
entries, index = summarize(entries, date, account_opening)
return entries, index
beancount.ops.summarize.open_opt(entries, date, options_map)
Convenience function to open() using an options map.
Source code in beancount/ops/summarize.py
def open_opt(entries, date, options_map):
"""Convenience function to open() using an options map.
"""
account_types = options.get_account_types(options_map)
previous_accounts = options.get_previous_accounts(options_map)
conversion_currency = options_map['conversion_currency']
return open(entries, date, account_types, conversion_currency, *previous_accounts)
beancount.ops.summarize.summarize(entries, date, account_opening)
Summarize all entries before a date by replacing then with summarization entries.
This function replaces the transactions up to (and not including) the given date with a opening balance transactions, one for each account. It returns new entries, all of the transactions before the given date having been replaced by a few summarization entries, one for each account.
Notes: - Open entries are preserved for active accounts. - The last relevant price entry for each (base, quote) pair is preserved. - All other entries before the cutoff date are culled.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def summarize(entries, date, account_opening):
"""Summarize all entries before a date by replacing then with summarization entries.
This function replaces the transactions up to (and not including) the given
date with a opening balance transactions, one for each account. It returns
new entries, all of the transactions before the given date having been
replaced by a few summarization entries, one for each account.
Notes:
- Open entries are preserved for active accounts.
- The last relevant price entry for each (base, quote) pair is preserved.
- All other entries before the cutoff date are culled.
Args:
entries: A list of directives.
date: A datetime.date instance, the cutoff date before which to summarize.
account_opening: A string, the name of the source account to book summarization
entries against.
Returns:
The function returns a list of new entries and the integer index at which
the entries on or after the cutoff date begin.
"""
# Compute balances at date.
balances, index = balance_by_account(entries, date)
# We need to insert the entries with a date previous to subsequent checks,
# to maintain ensure the open directives show up before any transaction.
summarize_date = date - datetime.timedelta(days=1)
# Create summarization / opening balance entries.
summarizing_entries = create_entries_from_balances(
balances, summarize_date, account_opening, True,
data.new_metadata('<summarize>', 0), flags.FLAG_SUMMARIZE,
"Opening balance for '{account}' (Summarization)")
# Insert the last price entry for each commodity from before the date.
price_entries = prices.get_last_price_entries(entries, date)
# Gather the list of active open entries at date.
open_entries = get_open_entries(entries, date)
# Compute entries before the date and preserve the entries after the date.
before_entries = sorted(open_entries + price_entries + summarizing_entries,
key=data.entry_sortkey)
after_entries = entries[index:]
# Return a new list of entries and the index that points after the entries
# were inserted.
return (before_entries + after_entries), len(before_entries)
beancount.ops.summarize.transfer_balances(entries, date, account_pred, transfer_account)
Synthesize transactions to transfer balances from some accounts at a given date.
For all accounts that match the 'account_pred' predicate, create new entries to transfer the balance at the given date from the account to the transfer account. This is used to transfer balances from income and expenses from a previous period to a "retained earnings" account. This is accomplished by creating new entries.
Note that inserting transfers breaks any following balance checks that are in the transferred accounts. For this reason, all balance assertion entries following the cutoff date for those accounts are removed from the list in output.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def transfer_balances(entries, date, account_pred, transfer_account):
"""Synthesize transactions to transfer balances from some accounts at a given date.
For all accounts that match the 'account_pred' predicate, create new entries
to transfer the balance at the given date from the account to the transfer
account. This is used to transfer balances from income and expenses from a
previous period to a "retained earnings" account. This is accomplished by
creating new entries.
Note that inserting transfers breaks any following balance checks that are
in the transferred accounts. For this reason, all balance assertion entries
following the cutoff date for those accounts are removed from the list in
output.
Args:
entries: A list of directives.
date: A datetime.date instance, the date at which to make the transfer.
account_pred: A predicate function that, given an account string, returns
true if the account is meant to be transferred.
transfer_account: A string, the name of the source account to be used on
the transfer entries to receive balances at the given date.
Returns:
A new list of entries, with the new transfer entries added in.
"""
# Don't bother doing anything if there are no entries.
if not entries:
return entries
# Compute balances at date.
balances, index = balance_by_account(entries, date)
# Filter out to keep only the accounts we want.
transfer_balances = {account: balance
for account, balance in balances.items()
if account_pred(account)}
# We need to insert the entries at the end of the previous day.
if date:
transfer_date = date - datetime.timedelta(days=1)
else:
transfer_date = entries[-1].date
# Create transfer entries.
transfer_entries = create_entries_from_balances(
transfer_balances, transfer_date, transfer_account, False,
data.new_metadata('<transfer_balances>', 0), flags.FLAG_TRANSFER,
"Transfer balance for '{account}' (Transfer balance)")
# Remove balance assertions that occur after a transfer on an account that
# has been transferred away; they would break.
after_entries = [entry
for entry in entries[index:]
if not (isinstance(entry, data.Balance) and
entry.account in transfer_balances)]
# Split the new entries in a new list.
return (entries[:index] + transfer_entries + after_entries)
beancount.ops.summarize.truncate(entries, date)
Filter out all the entries at and after date. Returns a new list of entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/summarize.py
def truncate(entries, date):
"""Filter out all the entries at and after date. Returns a new list of entries.
Args:
entries: A sorted list of directives.
date: A datetime.date instance.
Returns:
A truncated list of directives.
"""
index = bisect_key.bisect_left_with_key(entries, date,
key=lambda entry: entry.date)
return entries[:index]
beancount.ops.validation
Validation checks.
These checks are intended to be run after all the plugins have transformed the list of entries, just before serving them or generating reports from them. The idea is to ensure a reasonable set of invariants and generate errors if those invariants are violated. They are not sanity checks--user data is subject to constraints which are hopefully detected here and which will result in errors trickled up to the user.
beancount.ops.validation.ValidationError (tuple)
ValidationError(source, message, entry)
beancount.ops.validation.ValidationError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/ops/validation.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.ops.validation.ValidationError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of ValidationError(source, message, entry)
beancount.ops.validation.ValidationError.__replace__(/, self, **kwds)
special
Return a new ValidationError object replacing specified fields with new values
Source code in beancount/ops/validation.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.ops.validation.ValidationError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/ops/validation.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.ops.validation.validate(entries, options_map, log_timings=None, extra_validations=None)
Perform all the standard checks on parsed contents.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate(entries, options_map, log_timings=None, extra_validations=None):
"""Perform all the standard checks on parsed contents.
Args:
entries: A list of directives.
unused_options_map: An options map.
log_timings: An optional function to use for logging the time of individual
operations.
extra_validations: A list of extra validation functions to run after loading
this list of entries.
Returns:
A list of new errors, if any were found.
"""
validation_tests = VALIDATIONS
if extra_validations:
validation_tests += extra_validations
# Run various validation routines define above.
errors = []
for validation_function in validation_tests:
with misc_utils.log_time('function: {}'.format(validation_function.__name__),
log_timings, indent=2):
new_errors = validation_function(entries, options_map)
errors.extend(new_errors)
return errors
beancount.ops.validation.validate_active_accounts(entries, unused_options_map)
Check that all references to accounts occurs on active accounts.
We basically check that references to accounts from all directives other than Open and Close occur at dates the open-close interval of that account. This should be good for all of the directive types where we can extract an account name.
Note that this is more strict a check than comparing the dates: we actually check that no references to account are made on the same day before the open directive appears for that account. This is a nice property to have, and is supported by our custom sorting routine that will sort open entries before transaction entries, given the same date.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_active_accounts(entries, unused_options_map):
"""Check that all references to accounts occurs on active accounts.
We basically check that references to accounts from all directives other
than Open and Close occur at dates the open-close interval of that account.
This should be good for all of the directive types where we can extract an
account name.
Note that this is more strict a check than comparing the dates: we actually
check that no references to account are made on the same day before the open
directive appears for that account. This is a nice property to have, and is
supported by our custom sorting routine that will sort open entries before
transaction entries, given the same date.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
error_pairs = []
active_set = set()
opened_accounts = set()
for entry in entries:
if isinstance(entry, data.Open):
active_set.add(entry.account)
opened_accounts.add(entry.account)
elif isinstance(entry, data.Close):
active_set.discard(entry.account)
else:
for account in getters.get_entry_accounts(entry):
if account not in active_set:
# Allow document and note directives that occur after an
# account is closed.
if (isinstance(entry, ALLOW_AFTER_CLOSE) and
account in opened_accounts):
continue
# Register an error to be logged later, with an appropriate
# message.
error_pairs.append((account, entry))
# Refine the error message to disambiguate between the case of an account
# that has never been seen and one that was simply not active at the time.
errors = []
for account, entry in error_pairs:
if account in opened_accounts:
message = "Invalid reference to inactive account '{}'".format(account)
else:
message = "Invalid reference to unknown account '{}'".format(account)
errors.append(ValidationError(entry.meta, message, entry))
return errors
beancount.ops.validation.validate_check_transaction_balances(entries, options_map)
Check again that all transaction postings balance, as users may have transformed transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_check_transaction_balances(entries, options_map):
"""Check again that all transaction postings balance, as users may have
transformed transactions.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Note: this is a bit slow; we could limit our checks to the original
# transactions by using the hash function in the loader.
errors = []
for entry in entries:
if isinstance(entry, Transaction):
# IMPORTANT: This validation is _crucial_ and cannot be skipped.
# This is where we actually detect and warn on unbalancing
# transactions. This _must_ come after the user routines, because
# unbalancing input is legal, as those types of transactions may be
# "fixed up" by a user-plugin. In other words, we want to allow
# users to input unbalancing transactions as long as the final
# transactions objects that appear on the stream (after processing
# the plugins) are balanced. See {9e6c14b51a59}.
#
# Detect complete sets of postings that have residual balance;
residual = interpolate.compute_residual(entry.postings)
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
if not residual.is_small(tolerances):
errors.append(
ValidationError(entry.meta,
"Transaction does not balance: {}".format(residual),
entry))
return errors
beancount.ops.validation.validate_currency_constraints(entries, options_map)
Check the currency constraints from account open declarations.
Open directives admit an optional list of currencies that specify the only types of commodities that the running inventory for this account may contain. This function checks that all postings are only made in those commodities.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_currency_constraints(entries, options_map):
"""Check the currency constraints from account open declarations.
Open directives admit an optional list of currencies that specify the only
types of commodities that the running inventory for this account may
contain. This function checks that all postings are only made in those
commodities.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Get all the open entries with currency constraints.
open_map = {entry.account: entry
for entry in entries
if isinstance(entry, Open) and entry.currencies}
errors = []
for entry in entries:
if not isinstance(entry, Transaction):
continue
for posting in entry.postings:
# Look up the corresponding account's valid currencies; skip the
# check if there are none specified.
try:
open_entry = open_map[posting.account]
valid_currencies = open_entry.currencies
if not valid_currencies:
continue
except KeyError:
continue
# Perform the check.
if posting.units.currency not in valid_currencies:
errors.append(
ValidationError(
entry.meta,
"Invalid currency {} for account '{}'".format(
posting.units.currency, posting.account),
entry))
return errors
beancount.ops.validation.validate_data_types(entries, options_map)
Check that all the data types of the attributes of entries are as expected.
Users are provided with a means to filter the list of entries. They're able to write code that manipulates those tuple objects without any type constraints. With discipline, this mostly works, but I know better: check, just to make sure. This routine checks all the data types and assumptions on entries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_data_types(entries, options_map):
"""Check that all the data types of the attributes of entries are as expected.
Users are provided with a means to filter the list of entries. They're able to
write code that manipulates those tuple objects without any type constraints.
With discipline, this mostly works, but I know better: check, just to make sure.
This routine checks all the data types and assumptions on entries.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
for entry in entries:
try:
data.sanity_check_types(
entry, options_map["allow_deprecated_none_for_tags_and_links"])
except AssertionError as exc:
errors.append(
ValidationError(entry.meta,
"Invalid data types: {}".format(exc),
entry))
return errors
beancount.ops.validation.validate_documents_paths(entries, options_map)
Check that all filenames in resolved Document entries are absolute filenames.
The processing of document entries is assumed to result in absolute paths. Relative paths are resolved at the parsing stage and at point we want to make sure we don't have to do any further processing on them.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_documents_paths(entries, options_map):
"""Check that all filenames in resolved Document entries are absolute filenames.
The processing of document entries is assumed to result in absolute paths.
Relative paths are resolved at the parsing stage and at point we want to
make sure we don't have to do any further processing on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
return [ValidationError(entry.meta, "Invalid relative path for entry", entry)
for entry in entries
if (isinstance(entry, Document) and
not path.isabs(entry.filename))]
beancount.ops.validation.validate_duplicate_balances(entries, unused_options_map)
Check that balance entries occur only once per day.
Because we do not support time, and the declaration order of entries is meant to be kept irrelevant, two balance entries with different amounts should not occur in the file. We do allow two identical balance assertions, however, because this may occur during import.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_duplicate_balances(entries, unused_options_map):
"""Check that balance entries occur only once per day.
Because we do not support time, and the declaration order of entries is
meant to be kept irrelevant, two balance entries with different amounts
should not occur in the file. We do allow two identical balance assertions,
however, because this may occur during import.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
# Mapping of (account, currency, date) to Balance entry.
balance_entries = {}
for entry in entries:
if not isinstance(entry, data.Balance):
continue
key = (entry.account, entry.amount.currency, entry.date)
try:
previous_entry = balance_entries[key]
if entry.amount != previous_entry.amount:
errors.append(
ValidationError(
entry.meta,
"Duplicate balance assertion with different amounts",
entry))
except KeyError:
balance_entries[key] = entry
return errors
beancount.ops.validation.validate_duplicate_commodities(entries, unused_options_map)
Check that commodity entries are unique for each commodity.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_duplicate_commodities(entries, unused_options_map):
"""Check that commodity entries are unique for each commodity.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
# Mapping of (account, currency, date) to Balance entry.
commodity_entries = {}
for entry in entries:
if not isinstance(entry, data.Commodity):
continue
key = entry.currency
try:
previous_entry = commodity_entries[key]
if previous_entry:
errors.append(
ValidationError(
entry.meta,
"Duplicate commodity directives for '{}'".format(key),
entry))
except KeyError:
commodity_entries[key] = entry
return errors
beancount.ops.validation.validate_open_close(entries, unused_options_map)
Check constraints on open and close directives themselves.
This method checks two kinds of constraints:
-
An open or a close directive may only show up once for each account. If a duplicate is detected, an error is generated.
-
Close directives may only appear if an open directive has been seen previously (chronologically).
-
The date of close directives must be strictly greater than their corresponding open directive.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/ops/validation.py
def validate_open_close(entries, unused_options_map):
"""Check constraints on open and close directives themselves.
This method checks two kinds of constraints:
1. An open or a close directive may only show up once for each account. If a
duplicate is detected, an error is generated.
2. Close directives may only appear if an open directive has been seen
previously (chronologically).
3. The date of close directives must be strictly greater than their
corresponding open directive.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
open_map = {}
close_map = {}
for entry in entries:
if isinstance(entry, Open):
if entry.account in open_map:
errors.append(
ValidationError(
entry.meta,
"Duplicate open directive for {}".format(entry.account),
entry))
else:
open_map[entry.account] = entry
elif isinstance(entry, Close):
if entry.account in close_map:
errors.append(
ValidationError(
entry.meta,
"Duplicate close directive for {}".format(entry.account),
entry))
else:
try:
open_entry = open_map[entry.account]
if entry.date < open_entry.date:
errors.append(
ValidationError(
entry.meta,
"Internal error: closing date for {} "
"appears before opening date".format(entry.account),
entry))
except KeyError:
errors.append(
ValidationError(
entry.meta,
"Unopened account {} is being closed".format(entry.account),
entry))
close_map[entry.account] = entry
return errors