beancount.ops

Operations on the entries defined in the core modules.

This package contains various functions which operate on lists of entries.

beancount.ops.balance

Automatic padding of gaps between entries.

beancount.ops.balance.BalanceError (tuple)

BalanceError(source, message, entry)

beancount.ops.balance.BalanceError.__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in beancount/ops/balance.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.balance.BalanceError.__new__(_cls, source, message, entry) special staticmethod

Create new instance of BalanceError(source, message, entry)

beancount.ops.balance.BalanceError.__replace__(/, self, **kwds) special

Return a new BalanceError object replacing specified fields with new values

Source code in beancount/ops/balance.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.balance.BalanceError.__repr__(self) special

Return a nicely formatted representation string

Source code in beancount/ops/balance.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.balance.check(entries, options_map)

Process the balance assertion directives.

For each Balance directive, check that their expected balance corresponds to the actual balance computed at that time and replace failing ones by new ones with a flag that indicates failure.

Parameters:
  • entries – A list of directives.

  • options_map – A dict of options, parsed from the input file.

Returns:
  • A pair of a list of directives and a list of balance check errors.

Source code in beancount/ops/balance.py
def check(entries, options_map):
    """Process the balance assertion directives.

    For each Balance directive, check that their expected balance corresponds to
    the actual balance computed at that time and replace failing ones by new
    ones with a flag that indicates failure.

    Args:
      entries: A list of directives.
      options_map: A dict of options, parsed from the input file.
    Returns:
      A pair of a list of directives and a list of balance check errors.
    """
    new_entries = []
    check_errors = []

    # This is similar to realization, but performed in a different order, and
    # where we only accumulate inventories for accounts that have balance
    # assertions in them (this saves on time). Here we process the entries one
    # by one along with the balance checks. We use a temporary realization in
    # order to hold the incremental tree of balances, so that we can easily get
    # the amounts of an account's subaccounts for making checks on parent
    # accounts.
    real_root = realization.RealAccount('')

    # Figure out the set of accounts for which we need to compute a running
    # inventory balance.
    asserted_accounts = {entry.account
                         for entry in entries
                         if isinstance(entry, Balance)}

    # Add all children accounts of an asserted account to be calculated as well,
    # and pre-create these accounts, and only those (we're just being tight to
    # make sure).
    asserted_match_list = [account.parent_matcher(account_)
                           for account_ in asserted_accounts]
    for account_ in getters.get_accounts(entries):
        if (account_ in asserted_accounts or
            any(match(account_) for match in asserted_match_list)):
            realization.get_or_create(real_root, account_)

    # Get the Open directives for each account.
    open_close_map = getters.get_account_open_close(entries)

    for entry in entries:
        if isinstance(entry, Transaction):
            # For each of the postings' accounts, update the balance inventory.
            for posting in entry.postings:
                real_account = realization.get(real_root, posting.account)

                # The account will have been created only if we're meant to track it.
                if real_account is not None:
                    # Note: Always allow negative lots for the purpose of balancing.
                    # This error should show up somewhere else than here.
                    real_account.balance.add_position(posting)

        elif isinstance(entry, Balance):
            # Check that the currency of the balance check is one of the allowed
            # currencies for that account.
            expected_amount = entry.amount
            try:
                open, _ = open_close_map[entry.account]
            except KeyError:
                check_errors.append(
                    BalanceError(entry.meta,
                                 "Invalid reference to unknown account '{}'".format(
                                     entry.account), entry))
                continue

            if (expected_amount is not None and
                open and open.currencies and
                expected_amount.currency not in open.currencies):
                check_errors.append(
                    BalanceError(entry.meta,
                                 "Invalid currency '{}' for Balance directive: ".format(
                                     expected_amount.currency),
                                 entry))

            # Sum up the current balances for this account and its
            # sub-accounts. We want to support checks for parent accounts
            # for the total sum of their subaccounts.
            #
            # FIXME: Improve the performance further by computing the balance
            # for the desired currency only. This won't allow us to cache in
            # this way but may be faster, if we're not asserting all the
            # currencies. Furthermore, we could probably avoid recomputing the
            # balance if a subtree of positions hasn't been invalidated by a new
            # position added to the realization. Do this.
            real_account = realization.get(real_root, entry.account)
            assert real_account is not None, "Missing {}".format(entry.account)
            subtree_balance = realization.compute_balance(real_account, leaf_only=False)

            # Get only the amount in the desired currency.
            balance_amount = subtree_balance.get_currency_units(expected_amount.currency)

            # Check if the amount is within bounds of the expected amount.
            diff_amount = amount.sub(balance_amount, expected_amount)

            # Use the specified tolerance or automatically infer it.
            tolerance = get_balance_tolerance(entry, options_map)

            if abs(diff_amount.number) > tolerance:
                check_errors.append(
                    BalanceError(entry.meta,
                                 ("Balance failed for '{}': "
                                  "expected {} != accumulated {} ({} {})").format(
                                      entry.account, expected_amount, balance_amount,
                                      abs(diff_amount.number),
                                      ('too much'
                                       if diff_amount.number > 0
                                       else 'too little')),
                                 entry))

                # Substitute the entry by a failing entry, with the diff_amount
                # field set on it. I'm not entirely sure that this is the best
                # of ideas, maybe leaving the original check intact and insert a
                # new error entry might be more functional or easier to
                # understand.
                entry = entry._replace(
                    meta=entry.meta.copy(),
                    diff_amount=diff_amount)

        new_entries.append(entry)

    return new_entries, check_errors

beancount.ops.balance.get_balance_tolerance(balance_entry, options_map)

Get the tolerance amount for a single entry.

Parameters:
  • balance_entry – An instance of data.Balance

  • options_map – An options dict, as per the parser.

Returns:
  • A Decimal, the amount of tolerance implied by the directive.

Source code in beancount/ops/balance.py
def get_balance_tolerance(balance_entry, options_map):
    """Get the tolerance amount for a single entry.

    Args:
      balance_entry: An instance of data.Balance
      options_map: An options dict, as per the parser.
    Returns:
      A Decimal, the amount of tolerance implied by the directive.
    """
    if balance_entry.tolerance is not None:
        # Use the balance-specific tolerance override if it is provided.
        tolerance = balance_entry.tolerance

    else:
        expo = balance_entry.amount.number.as_tuple().exponent
        if expo < 0:
            # Be generous and always allow twice the multiplier on Balance and
            # Pad because the user creates these and the rounding of those
            # balances may often be further off than those used within a single
            # transaction.
            tolerance = options_map["inferred_tolerance_multiplier"] * 2
            tolerance = ONE.scaleb(expo) * tolerance
        else:
            tolerance = ZERO

    return tolerance

beancount.ops.basicops

Basic filtering and aggregation operations on lists of entries.

This module contains some common basic operations on entries that are complex enough not to belong in core/data.py.

Yield all the entries which have the given link.

Parameters:
  • link – A string, the link we are interested in.

Yields: Every entry in 'entries' that links to 'link.

Source code in beancount/ops/basicops.py
def filter_link(link, entries):
    """Yield all the entries which have the given link.

    Args:
      link: A string, the link we are interested in.
    Yields:
      Every entry in 'entries' that links to 'link.
    """
    for entry in entries:
        if (isinstance(entry, data.Transaction) and
            entry.links and link in entry.links):
            yield entry

beancount.ops.basicops.filter_tag(tag, entries)

Yield all the entries which have the given tag.

Parameters:
  • tag – A string, the tag we are interested in.

Yields: Every entry in 'entries' that tags to 'tag.

Source code in beancount/ops/basicops.py
def filter_tag(tag, entries):
    """Yield all the entries which have the given tag.

    Args:
      tag: A string, the tag we are interested in.
    Yields:
      Every entry in 'entries' that tags to 'tag.
    """
    for entry in entries:
        if (isinstance(entry, data.Transaction) and
            entry.tags and
            tag in entry.tags):
            yield entry

beancount.ops.basicops.get_common_accounts(entries)

Compute the intersection of the accounts on the given entries.

Parameters:
  • entries – A list of Transaction entries to process.

Returns:
  • A set of strings, the names of the common accounts from these entries.

Source code in beancount/ops/basicops.py
def get_common_accounts(entries):
    """Compute the intersection of the accounts on the given entries.

    Args:
      entries: A list of Transaction entries to process.
    Returns:
      A set of strings, the names of the common accounts from these
      entries.
    """
    assert all(isinstance(entry, data.Transaction) for entry in entries)

    # If there is a single entry, the common accounts to it is all its accounts.
    # Note that this also works with no entries (yields an empty set).
    if len(entries) < 2:
        if entries:
            intersection = {posting.account for posting in entries[0].postings}
        else:
            intersection = set()
    else:
        entries_iter = iter(entries)
        intersection = set(posting.account for posting in next(entries_iter).postings)
        for entry in entries_iter:
            accounts = set(posting.account for posting in entry.postings)
            intersection &= accounts
            if not intersection:
                break
    return intersection

Group the list of entries by link.

Parameters:
  • entries – A list of directives/transactions to process.

Returns:
  • A dict of link-name to list of entries.

Source code in beancount/ops/basicops.py
def group_entries_by_link(entries):
    """Group the list of entries by link.

    Args:
      entries: A list of directives/transactions to process.
    Returns:
      A dict of link-name to list of entries.
    """
    link_groups = defaultdict(list)
    for entry in entries:
        if not (isinstance(entry, data.Transaction) and entry.links):
            continue
        for link in entry.links:
            link_groups[link].append(entry)
    return link_groups

beancount.ops.compress

Compress multiple entries into a single one.

This can be used during import to compress the effective output, for accounts with a large number of similar entries. For example, I had a trading account which would pay out interest every single day. I have no desire to import the full detail of these daily interests, and compressing these interest-only entries to monthly ones made sense. This is the code that was used to carry this out.

beancount.ops.compress.compress(entries, predicate)

Compress multiple transactions into single transactions.

Replace consecutive sequences of Transaction entries that fulfill the given predicate by a single entry at the date of the last matching entry. 'predicate' is the function that determines if an entry should be compressed.

This can be used to simply a list of transactions that are similar and occur frequently. As an example, in a retail FOREX trading account, differential interest of very small amounts is paid every day; it is not relevant to look at the full detail of this interest unless there are other transactions. You can use this to compress it into single entries between other types of transactions.

Parameters:
  • entries – A list of directives.

  • predicate – A callable which accepts an entry and return true if the entry is intended to be compressed.

Returns:
  • A list of directives, with compressible transactions replaced by a summary equivalent.

Source code in beancount/ops/compress.py
def compress(entries, predicate):
    """Compress multiple transactions into single transactions.

    Replace consecutive sequences of Transaction entries that fulfill the given
    predicate by a single entry at the date of the last matching entry.
    'predicate' is the function that determines if an entry should be
    compressed.

    This can be used to simply a list of transactions that are similar and occur
    frequently. As an example, in a retail FOREX trading account, differential
    interest of very small amounts is paid every day; it is not relevant to look
    at the full detail of this interest unless there are other transactions. You
    can use this to compress it into single entries between other types of
    transactions.

    Args:
      entries: A list of directives.
      predicate: A callable which accepts an entry and return true if the entry
          is intended to be compressed.
    Returns:
      A list of directives, with compressible transactions replaced by a summary
      equivalent.
    """
    new_entries = []
    pending = []
    for entry in entries:
        if isinstance(entry, data.Transaction) and predicate(entry):
            # Save for compressing later.
            pending.append(entry)
        else:
            # Compress and output all the pending entries.
            if pending:
                new_entries.append(merge(pending, pending[-1]))
                pending.clear()

            # Output the differing entry.
            new_entries.append(entry)

    if pending:
        new_entries.append(merge(pending, pending[-1]))

    return new_entries

beancount.ops.compress.merge(entries, prototype_txn)

Merge the postings of a list of Transactions into a single one.

Merge postings the given entries into a single entry with the Transaction attributes of the prototype. Return the new entry. The combined list of postings are merged if everything about the postings is the same except the number.

Parameters:
  • entries – A list of directives.

  • prototype_txn – A Transaction which is used to create the compressed Transaction instance. Its list of postings is ignored.

Returns:
  • A new Transaction instance which contains all the postings from the input entries merged together.

Source code in beancount/ops/compress.py
def merge(entries, prototype_txn):
    """Merge the postings of a list of Transactions into a single one.

    Merge postings the given entries into a single entry with the Transaction
    attributes of the prototype. Return the new entry. The combined list of
    postings are merged if everything about the postings is the same except the
    number.

    Args:
      entries: A list of directives.
      prototype_txn: A Transaction which is used to create the compressed
          Transaction instance. Its list of postings is ignored.
    Returns:
      A new Transaction instance which contains all the postings from the input
      entries merged together.

    """
    # Aggregate the postings together. This is a mapping of numberless postings
    # to their number of units.
    postings_map = collections.defaultdict(Decimal)
    for entry in data.filter_txns(entries):
        for posting in entry.postings:
            # We strip the number off the posting to act as an aggregation key.
            key = data.Posting(posting.account,
                               Amount(None, posting.units.currency),
                               posting.cost,
                               posting.price,
                               posting.flag,
                               None)
            postings_map[key] += posting.units.number

    # Create a new transaction with the aggregated postings.
    new_entry = data.Transaction(prototype_txn.meta,
                                 prototype_txn.date,
                                 prototype_txn.flag,
                                 prototype_txn.payee,
                                 prototype_txn.narration,
                                 data.EMPTY_SET, data.EMPTY_SET, [])

    # Sort for at least some stability of output.
    sorted_items = sorted(postings_map.items(),
                          key=lambda item: (item[0].account,
                                            item[0].units.currency,
                                            item[1]))

    # Issue the merged postings.
    for posting, number in sorted_items:
        units = Amount(number, posting.units.currency)
        new_entry.postings.append(
            data.Posting(posting.account, units, posting.cost, posting.price,
                         posting.flag, posting.meta))

    return new_entry

beancount.ops.documents

Everything that relates to creating the Document directives.

beancount.ops.documents.DocumentError (tuple)

DocumentError(source, message, entry)

beancount.ops.documents.DocumentError.__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in beancount/ops/documents.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.documents.DocumentError.__new__(_cls, source, message, entry) special staticmethod

Create new instance of DocumentError(source, message, entry)

beancount.ops.documents.DocumentError.__replace__(/, self, **kwds) special

Return a new DocumentError object replacing specified fields with new values

Source code in beancount/ops/documents.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.documents.DocumentError.__repr__(self) special

Return a nicely formatted representation string

Source code in beancount/ops/documents.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.documents.find_documents(directory, input_filename, accounts_only=None, strict=False)

Find dated document files under the given directory.

If a restricting set of accounts is provided in 'accounts_only', only return entries that correspond to one of the given accounts.

Parameters:
  • directory – A string, the name of the root of the directory hierarchy to be searched.

  • input_filename – The name of the file to be used for the Document directives. This is also used to resolve relative directory names.

  • accounts_only – A set of valid accounts strings to search for.

  • strict – A boolean, set to true if you want to generate errors on documents found in accounts not provided in accounts_only. This is only meaningful if accounts_only is specified.

Returns:
  • A list of new Document objects that were created from the files found, and a list of new errors generated.

Source code in beancount/ops/documents.py
def find_documents(directory, input_filename, accounts_only=None, strict=False):
    """Find dated document files under the given directory.

    If a restricting set of accounts is provided in 'accounts_only', only return
    entries that correspond to one of the given accounts.

    Args:
      directory: A string, the name of the root of the directory hierarchy to be searched.
      input_filename: The name of the file to be used for the Document directives. This is
        also used to resolve relative directory names.
      accounts_only: A set of valid accounts strings to search for.
      strict: A boolean, set to true if you want to generate errors on documents
        found in accounts not provided in accounts_only. This is only meaningful
        if accounts_only is specified.
    Returns:
      A list of new Document objects that were created from the files found, and a list
      of new errors generated.

    """
    errors = []

    # Compute the documents directory name relative to the beancount input
    # file itself.
    if not path.isabs(directory):
        input_directory = path.dirname(input_filename)
        directory = path.abspath(path.normpath(path.join(input_directory,
                                                         directory)))

    # If the directory does not exist, just generate an error and return.
    if not path.exists(directory):
        meta = data.new_metadata(input_filename, 0)
        error = DocumentError(
            meta, "Document root '{}' does not exist".format(directory), None)
        return ([], [error])

    # Walk the hierarchy of files.
    entries = []
    for root, account_name, dirs, files in account.walk(directory):

        # Look for files that have a dated filename.
        for filename in files:
            match = re.match(r'(\d\d\d\d)-(\d\d)-(\d\d).(.*)', filename)
            if not match:
                continue

            # If a restricting set of accounts was specified, skip document
            # directives found in accounts with no corresponding account name.
            if accounts_only is not None and not account_name in accounts_only:
                if strict:
                    if any(account_name.startswith(account) for account in accounts_only):
                        errors.append(DocumentError(
                            data.new_metadata(input_filename, 0),
                            "Document '{}' found in child account {}".format(
                                filename, account_name), None))
                    elif any(account.startswith(account_name) for account in accounts_only):
                        errors.append(DocumentError(
                            data.new_metadata(input_filename, 0),
                            "Document '{}' found in parent account {}".format(
                                filename, account_name), None))
                continue

            # Create a new directive.
            meta = data.new_metadata(input_filename, 0)
            try:
                date = datetime.date(*map(int, match.group(1, 2, 3)))
            except ValueError as exc:
                errors.append(DocumentError(
                    data.new_metadata(input_filename, 0),
                    "Invalid date on document file '{}': {}".format(
                        filename, exc), None))
            else:
                entry = data.Document(meta, date, account_name, path.join(root, filename),
                                      data.EMPTY_SET, data.EMPTY_SET)
                entries.append(entry)

    return (entries, errors)

beancount.ops.documents.process_documents(entries, options_map)

Check files for document directives and create documents directives automatically.

Parameters:
  • entries – A list of all directives parsed from the file.

  • options_map – An options dict, as is output by the parser. We're using its 'filename' option to figure out relative path to search for documents.

Returns:
  • A pair of list of all entries (including new ones), and errors generated during the process of creating document directives.

Source code in beancount/ops/documents.py
def process_documents(entries, options_map):
    """Check files for document directives and create documents directives automatically.

    Args:
      entries: A list of all directives parsed from the file.
      options_map: An options dict, as is output by the parser.
        We're using its 'filename' option to figure out relative path to
        search for documents.
    Returns:
      A pair of list of all entries (including new ones), and errors
      generated during the process of creating document directives.
    """
    filename = options_map["filename"]

    # Detect filenames that should convert into entries.
    autodoc_entries = []
    autodoc_errors = []
    document_dirs = options_map['documents']
    if document_dirs:
        # Restrict to the list of valid accounts only.
        accounts = getters.get_accounts(entries)

        # Accumulate all the entries.
        for directory in map(path.normpath, document_dirs):
            new_entries, new_errors = find_documents(directory, filename, accounts)
            autodoc_entries.extend(new_entries)
            autodoc_errors.extend(new_errors)

    # Merge the two lists of entries and errors. Keep the entries sorted.
    entries.extend(autodoc_entries)
    entries.sort(key=data.entry_sortkey)

    return (entries, autodoc_errors)

beancount.ops.documents.verify_document_files_exist(entries, unused_options_map)

Verify that the document entries point to existing files.

Parameters:
  • entries – a list of directives whose documents need to be validated.

  • unused_options_map – A parser options dict. We're not using it.

Returns:
  • The same list of entries, and a list of new errors, if any were encountered.

Source code in beancount/ops/documents.py
def verify_document_files_exist(entries, unused_options_map):
    """Verify that the document entries point to existing files.

    Args:
      entries: a list of directives whose documents need to be validated.
      unused_options_map: A parser options dict. We're not using it.
    Returns:
      The same list of entries, and a list of new errors, if any were encountered.
    """
    errors = []
    for entry in entries:
        if not isinstance(entry, data.Document):
            continue
        if not path.exists(entry.filename):
            errors.append(
                DocumentError(entry.meta,
                              'File does not exist: "{}"'.format(entry.filename),
                              entry))
    return entries, errors

beancount.ops.find_prices

A library of codes create price fetching jobs from strings and files.

beancount.ops.find_prices.find_balance_currencies(entries, date=None)

Return currencies relevant for the given date.

This computes the account balances as of the date, and returns the union of: a) The currencies held at cost, and b) Currency pairs from previous conversions, but only for currencies with non-zero balances.

This is intended to produce the list of currencies whose prices are relevant at a particular date, based on previous history.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance.

Returns:
  • A set of (base, quote) currencies.

Source code in beancount/ops/find_prices.py
def find_balance_currencies(entries, date=None):
    """Return currencies relevant for the given date.

    This computes the account balances as of the date, and returns the union of:
    a) The currencies held at cost, and
    b) Currency pairs from previous conversions, but only for currencies with
       non-zero balances.

    This is intended to produce the list of currencies whose prices are relevant
    at a particular date, based on previous history.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A set of (base, quote) currencies.
    """
    # Compute the balances.
    currencies = set()
    currencies_on_books = set()
    balances, _ = summarize.balance_by_account(entries, date)
    for _, balance in balances.items():
        for pos in balance:
            if pos.cost is not None:
                # Add currencies held at cost.
                currencies.add((pos.units.currency, pos.cost.currency))
            else:
                # Add regular currencies.
                currencies_on_books.add(pos.units.currency)

    # Create currency pairs from the currencies which are on account balances.
    # In order to figure out the quote currencies, we use the list of price
    # conversions until this date.
    converted = (find_currencies_converted(entries, date) |
                 find_currencies_priced(entries, date))
    for cbase in currencies_on_books:
        for base_quote in converted:
            base, quote = base_quote
            if base == cbase:
                currencies.add(base_quote)

    return currencies

beancount.ops.find_prices.find_currencies_at_cost(entries, date=None)

Return all currencies that were held at cost at some point.

This returns all of them, even if not on the books at a particular point in time. This code does not look at account balances.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance.

Returns:
  • A list of (base, quote) currencies.

Source code in beancount/ops/find_prices.py
def find_currencies_at_cost(entries, date=None):
    """Return all currencies that were held at cost at some point.

    This returns all of them, even if not on the books at a particular point in
    time. This code does not look at account balances.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Transaction):
            continue
        if date and entry.date >= date:
            break
        for posting in entry.postings:
            if posting.cost is not None and posting.cost.number is not None:
                currencies.add((posting.units.currency, posting.cost.currency))
    return currencies

beancount.ops.find_prices.find_currencies_converted(entries, date=None)

Return currencies from price conversions.

This function looks at all price conversions that occurred until some date and produces a list of them. Note: This does not include Price directives, only postings with price conversions.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance.

Returns:
  • A list of (base, quote) currencies.

Source code in beancount/ops/find_prices.py
def find_currencies_converted(entries, date=None):
    """Return currencies from price conversions.

    This function looks at all price conversions that occurred until some date
    and produces a list of them. Note: This does not include Price directives,
    only postings with price conversions.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Transaction):
            continue
        if date and entry.date >= date:
            break
        for posting in entry.postings:
            price = posting.price
            if posting.cost is not None or price is None:
                continue
            currencies.add((posting.units.currency, price.currency))
    return currencies

beancount.ops.find_prices.find_currencies_priced(entries, date=None)

Return currencies seen in Price directives.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance.

Returns:
  • A list of (base, quote) currencies.

Source code in beancount/ops/find_prices.py
def find_currencies_priced(entries, date=None):
    """Return currencies seen in Price directives.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Price):
            continue
        if date and entry.date >= date:
            break
        currencies.add((entry.currency, entry.amount.currency))
    return currencies

beancount.ops.lifetimes

Given a Beancount ledger, compute time intervals where we hold each commodity.

This script computes, for each commodity, which time intervals it is required at. This can then be used to identify a list of dates at which we need to fetch prices in order to properly fill the price database.

beancount.ops.lifetimes.compress_intervals_days(intervals, num_days)

Compress a list of date pairs to ignore short stretches of unused days.

Parameters:
  • intervals – A list of pairs of datetime.date instances.

  • num_days – An integer, the number of unused days to require for intervals to be distinct, to allow a gap.

Returns:
  • A new dict of lifetimes map where some intervals may have been joined.

Source code in beancount/ops/lifetimes.py
def compress_intervals_days(intervals, num_days):
    """Compress a list of date pairs to ignore short stretches of unused days.

    Args:
      intervals: A list of pairs of datetime.date instances.
      num_days: An integer, the number of unused days to require for intervals
        to be distinct, to allow a gap.
    Returns:
      A new dict of lifetimes map where some intervals may have been joined.
    """
    ignore_interval = datetime.timedelta(days=num_days)
    new_intervals = []
    iter_intervals = iter(intervals)
    last_begin, last_end = next(iter_intervals)
    for date_begin, date_end in iter_intervals:
        if date_begin - last_end < ignore_interval:
            # Compress.
            last_end = date_end
            continue
        new_intervals.append((last_begin, last_end))
        last_begin, last_end = date_begin, date_end
    new_intervals.append((last_begin, last_end))
    return new_intervals

beancount.ops.lifetimes.compress_lifetimes_days(lifetimes_map, num_days)

Compress a lifetimes map to ignore short stretches of unused days.

Parameters:
  • lifetimes_map – A dict of currency intervals as returned by get_commodity_lifetimes.

  • num_days – An integer, the number of unused days to ignore.

Returns:
  • A new dict of lifetimes map where some intervals may have been joined.

Source code in beancount/ops/lifetimes.py
def compress_lifetimes_days(lifetimes_map, num_days):
    """Compress a lifetimes map to ignore short stretches of unused days.

    Args:
      lifetimes_map: A dict of currency intervals as returned by get_commodity_lifetimes.
      num_days: An integer, the number of unused days to ignore.
    Returns:
      A new dict of lifetimes map where some intervals may have been joined.
    """
    return {currency_pair: compress_intervals_days(intervals, num_days)
            for currency_pair, intervals in lifetimes_map.items()}

beancount.ops.lifetimes.get_commodity_lifetimes(entries)

Given a list of directives, figure out the life of each commodity.

Parameters:
  • entries – A list of directives.

Returns:
  • A dict of (currency, cost-currency) commodity strings to lists of (start, end) datetime.date pairs. The dates are inclusive of the day the commodity was seen; the end/last dates are one day after the last date seen.

Source code in beancount/ops/lifetimes.py
def get_commodity_lifetimes(entries):
    """Given a list of directives, figure out the life of each commodity.

    Args:
      entries: A list of directives.
    Returns:
      A dict of (currency, cost-currency) commodity strings to lists of (start,
      end) datetime.date pairs. The dates are inclusive of the day the commodity
      was seen; the end/last dates are one day _after_ the last date seen.
    """
    lifetimes = collections.defaultdict(list)

    # The current set of active commodities.
    commodities = set()

    # The current balances across all accounts.
    balances = collections.defaultdict(inventory.Inventory)

    for entry in entries:
        # Process only transaction entries.
        if not isinstance(entry, data.Transaction):
            continue

        # Update the balance of affected accounts and check locally whether that
        # triggered a change in the set of commodities.
        commodities_changed = False
        for posting in entry.postings:
            balance = balances[posting.account]
            commodities_before = balance.currency_pairs()
            balance.add_position(posting)
            commodities_after = balance.currency_pairs()
            if commodities_after != commodities_before:
                commodities_changed = True

        # If there was a change in one of the affected account's list of
        # commodities, recompute the total set globally. This should not
        # occur very frequently.
        if commodities_changed:
            new_commodities = set(
                itertools.chain(*(inv.currency_pairs() for inv in balances.values())))
            if new_commodities != commodities:
                # The new global set of commodities has changed; update our
                # the dictionary of intervals.
                for currency in new_commodities - commodities:
                    lifetimes[currency].append((entry.date, None))

                for currency in commodities - new_commodities:
                    lifetime = lifetimes[currency]
                    begin_date, end_date = lifetime.pop(-1)
                    assert end_date is None
                    lifetime.append((begin_date, entry.date + ONEDAY))

                # Update our current set.
                commodities = new_commodities

    return lifetimes

beancount.ops.lifetimes.required_daily_prices(lifetimes_map, date_last, weekdays_only=False)

Enumerate all the commodities and days where the price is required.

Given a map of lifetimes for a set of commodities, enumerate all the days for each commodity where it is active. This can be used to connect to a historical price fetcher routine to fill in missing price entries from an existing ledger.

Parameters:
  • lifetimes_map – A dict of currency to active intervals as returned by get_commodity_lifetimes().

  • date_last – A datetime.date instance, the last date which we're interested in.

  • weekdays_only – Option to limit fetching to weekdays only.

Returns:
  • Tuples of (date, currency, cost-currency).

Source code in beancount/ops/lifetimes.py
def required_daily_prices(lifetimes_map, date_last, weekdays_only=False):
    """Enumerate all the commodities and days where the price is required.

    Given a map of lifetimes for a set of commodities, enumerate all the days
    for each commodity where it is active. This can be used to connect to a
    historical price fetcher routine to fill in missing price entries from an
    existing ledger.

    Args:
      lifetimes_map: A dict of currency to active intervals as returned by
        get_commodity_lifetimes().
      date_last: A datetime.date instance, the last date which we're interested in.
      weekdays_only: Option to limit fetching to weekdays only.
    Returns:
      Tuples of (date, currency, cost-currency).
    """
    results = []
    for currency_pair, intervals in lifetimes_map.items():
        if currency_pair[1] is None:
            continue
        for date_begin, date_end in intervals:
            # Find first Weekday starting on or before minimum date.
            date = date_begin
            if(weekdays_only):
                diff_days = 4 - date_begin.weekday()
                if diff_days < 0:
                    date += datetime.timedelta(days=diff_days)

            # Iterate over all weekdays.
            if date_end is None:
                date_end = date_last
            while date < date_end:
                results.append((date, currency_pair[0], currency_pair[1]))
                if weekdays_only and date.weekday() == 4:
                    date += 3 * ONEDAY
                else:
                    date += ONEDAY

    return sorted(results)

beancount.ops.lifetimes.required_weekly_prices(lifetimes_map, date_last)

Enumerate all the commodities and Fridays where the price is required.

Given a map of lifetimes for a set of commodities, enumerate all the Fridays for each commodity where it is active. This can be used to connect to a historical price fetcher routine to fill in missing price entries from an existing ledger.

Parameters:
  • lifetimes_map – A dict of currency to active intervals as returned by get_commodity_lifetimes().

  • date_last – A datetime.date instance, the last date which we're interested in.

Returns:
  • Tuples of (date, currency, cost-currency).

Source code in beancount/ops/lifetimes.py
def required_weekly_prices(lifetimes_map, date_last):
    """Enumerate all the commodities and Fridays where the price is required.

    Given a map of lifetimes for a set of commodities, enumerate all the Fridays
    for each commodity where it is active. This can be used to connect to a
    historical price fetcher routine to fill in missing price entries from an
    existing ledger.

    Args:
      lifetimes_map: A dict of currency to active intervals as returned by
        get_commodity_lifetimes().
      date_last: A datetime.date instance, the last date which we're interested in.
    Returns:
      Tuples of (date, currency, cost-currency).
    """
    results = []
    for currency_pair, intervals in lifetimes_map.items():
        if currency_pair[1] is None:
            continue
        for date_begin, date_end in intervals:
            # Find first Friday before the minimum date.
            diff_days = 4 - date_begin.weekday()
            if diff_days >= 1:
                diff_days -= 7
            date = date_begin + datetime.timedelta(days=diff_days)

            # Iterate over all Fridays.
            if date_end is None:
                date_end = date_last
            while date < date_end:
                results.append((date, currency_pair[0], currency_pair[1]))
                date += ONE_WEEK
    return sorted(results)

beancount.ops.lifetimes.trim_intervals(intervals, trim_start=None, trim_end=None)

Trim a list of date pairs to be within a start and end date. Useful in update-style price fetching.

Parameters:
  • intervals – A list of pairs of datetime.date instances

  • trim_start – An inclusive starting date.

  • trim_end – An exclusive starting date.

Returns:
  • A list of new intervals (pairs of (date, date)).

Source code in beancount/ops/lifetimes.py
def trim_intervals(intervals, trim_start=None, trim_end=None):
    """Trim a list of date pairs to be within a start and end date.
    Useful in update-style price fetching.

    Args:
      intervals: A list of pairs of datetime.date instances
      trim_start: An inclusive starting date.
      trim_end: An exclusive starting date.
    Returns:
      A list of new intervals (pairs of (date, date)).
    """
    new_intervals = []
    iter_intervals = iter(intervals)
    if(trim_start is not None and
       trim_end is not None and
       trim_end < trim_start):
        raise ValueError('Trim end date is before start date')

    for date_begin, date_end in iter_intervals:
        if(trim_start is not None and
           trim_start > date_begin):
            date_begin = trim_start
        if(trim_end is not None):
            if(date_end is None or
               trim_end < date_end):
                date_end = trim_end

        if(date_end is None or
           date_begin <= date_end):
            new_intervals.append((date_begin, date_end))
    return new_intervals

beancount.ops.pad

Automatic padding of gaps between entries.

beancount.ops.pad.PadError (tuple)

PadError(source, message, entry)

beancount.ops.pad.PadError.__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in beancount/ops/pad.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.pad.PadError.__new__(_cls, source, message, entry) special staticmethod

Create new instance of PadError(source, message, entry)

beancount.ops.pad.PadError.__replace__(/, self, **kwds) special

Return a new PadError object replacing specified fields with new values

Source code in beancount/ops/pad.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.pad.PadError.__repr__(self) special

Return a nicely formatted representation string

Source code in beancount/ops/pad.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.pad.pad(entries, options_map)

Insert transaction entries for to fulfill a subsequent balance check.

Synthesize and insert Transaction entries right after Pad entries in order to fulfill checks in the padded accounts. Returns a new list of entries. Note that this doesn't pad across parent-child relationships, it is a very simple kind of pad. (I have found this to be sufficient in practice, and simpler to implement and understand.)

Furthermore, this pads for a single currency only, that is, balance checks are specified only for one currency at a time, and pads will only be inserted for those currencies.

Parameters:
  • entries – A list of directives.

  • options_map – A parser options dict.

Returns:
  • A new list of directives, with Pad entries inserted, and a list of new errors produced.

Source code in beancount/ops/pad.py
def pad(entries, options_map):
    """Insert transaction entries for to fulfill a subsequent balance check.

    Synthesize and insert Transaction entries right after Pad entries in order
    to fulfill checks in the padded accounts. Returns a new list of entries.
    Note that this doesn't pad across parent-child relationships, it is a very
    simple kind of pad. (I have found this to be sufficient in practice, and
    simpler to implement and understand.)

    Furthermore, this pads for a single currency only, that is, balance checks
    are specified only for one currency at a time, and pads will only be
    inserted for those currencies.

    Args:
      entries: A list of directives.
      options_map: A parser options dict.
    Returns:
      A new list of directives, with Pad entries inserted, and a list of new
      errors produced.
    """
    pad_errors = []

    # Find all the pad entries and group them by account.
    pads = list(misc_utils.filter_type(entries, data.Pad))
    pad_dict = misc_utils.groupby(lambda x: x.account, pads)

    # Partially realize the postings, so we can iterate them by account.
    by_account = realization.postings_by_account(entries)

    # A dict of pad -> list of entries to be inserted.
    new_entries = {id(pad): [] for pad in pads}

    # Process each account that has a padding group.
    for account_, pad_list in sorted(pad_dict.items()):

        # Last encountered / currency active pad entry.
        active_pad = None

        # Gather all the postings for the account and its children.
        postings = []
        is_child = account.parent_matcher(account_)
        for item_account, item_postings in by_account.items():
            if is_child(item_account):
                postings.extend(item_postings)
        postings.sort(key=data.posting_sortkey)

        # A set of currencies already padded so far in this account.
        padded_lots = set()

        pad_balance = inventory.Inventory()
        for entry in postings:

            assert not isinstance(entry, data.Posting)
            if isinstance(entry, data.TxnPosting):
                # This is a transaction; update the running balance for this
                # account.
                pad_balance.add_position(entry.posting)

            elif isinstance(entry, data.Pad):
                if entry.account == account_:
                    # Mark this newly encountered pad as active and allow all lots
                    # to be padded heretofore.
                    active_pad = entry
                    padded_lots = set()

            elif isinstance(entry, data.Balance):
                check_amount = entry.amount

                # Compare the current balance amount to the expected one from
                # the check entry. IMPORTANT: You need to understand that this
                # does not check a single position, but rather checks that the
                # total amount for a particular currency (which itself is
                # distinct from the cost).
                balance_amount = pad_balance.get_currency_units(check_amount.currency)
                diff_amount = amount.sub(balance_amount, check_amount)

                # Use the specified tolerance or automatically infer it.
                tolerance = balance.get_balance_tolerance(entry, options_map)

                if abs(diff_amount.number) > tolerance:
                    # The check fails; we need to pad.

                    # Pad only if pad entry is active and we haven't already
                    # padded that lot since it was last encountered.
                    if active_pad and (check_amount.currency not in padded_lots):

                        # Note: we decide that it's an error to try to pad
                        # positions at cost; we check here that all the existing
                        # positions with that currency have no cost.
                        positions = [pos
                                     for pos in pad_balance.get_positions()
                                     if pos.units.currency == check_amount.currency]
                        for position_ in positions:
                            if position_.cost is not None:
                                pad_errors.append(
                                    PadError(entry.meta,
                                             ("Attempt to pad an entry with cost for "
                                              "balance: {}".format(pad_balance)),
                                             active_pad))

                        # Thus our padding lot is without cost by default.
                        diff_position = position.Position.from_amounts(
                            amount.Amount(check_amount.number - balance_amount.number,
                                          check_amount.currency))

                        # Synthesize a new transaction entry for the difference.
                        narration = ('(Padding inserted for Balance of {} for '
                                     'difference {})').format(check_amount, diff_position)
                        new_entry = data.Transaction(
                            active_pad.meta.copy(), active_pad.date, flags.FLAG_PADDING,
                            None, narration, data.EMPTY_SET, data.EMPTY_SET, [])

                        new_entry.postings.append(
                            data.Posting(active_pad.account,
                                         diff_position.units, diff_position.cost,
                                         None, None, {}))
                        neg_diff_position = -diff_position
                        new_entry.postings.append(
                            data.Posting(active_pad.source_account,
                                         neg_diff_position.units, neg_diff_position.cost,
                                         None, None, {}))

                        # Save it for later insertion after the active pad.
                        new_entries[id(active_pad)].append(new_entry)

                        # Fixup the running balance.
                        pos, _ = pad_balance.add_position(diff_position)
                        if pos is not None and pos.is_negative_at_cost():
                            raise ValueError(
                                "Position held at cost goes negative: {}".format(pos))

                # Mark this lot as padded. Further checks should not pad this lot.
                padded_lots.add(check_amount.currency)

    # Insert the newly created entries right after the pad entries that created them.
    padded_entries = []
    for entry in entries:
        padded_entries.append(entry)
        if isinstance(entry, data.Pad):
            entry_list = new_entries[id(entry)]
            if entry_list:
                padded_entries.extend(entry_list)
            else:
                # Generate errors on unused pad entries.
                pad_errors.append(
                    PadError(entry.meta, "Unused Pad entry", entry))

    return padded_entries, pad_errors

beancount.ops.summarize

Summarization of entries.

This code is used to summarize a sequence of entries (e.g. during a time period) into a few "opening balance" entries. This is when computing a balance sheet for a specific time period: we don't want to see the entries from before some period of time, so we fold them into a single transaction per account that has the sum total amount of that account.

beancount.ops.summarize.balance_by_account(entries, date=None, compress_unbooked=False)

Sum up the balance per account for all entries strictly before 'date'.

Parameters:
  • entries – A list of directives.

  • date – An optional datetime.date instance. If provided, stop accumulating on and after this date. This is useful for summarization before a specific date.

  • compress_unbooked – For accounts that have a booking method of NONE, compress their positions into a single average position. This can be used when you export the full list of positions, because those accounts will have a myriad of small positions from fees at negative cost and what-not.

Returns:
  • A pair of a dict of account string to instance Inventory (the balance of this account before the given date), and the index in the list of entries where the date was encountered. If all entries are located before the cutoff date, an index one beyond the last entry is returned.

Source code in beancount/ops/summarize.py
def balance_by_account(entries, date=None, compress_unbooked=False):
    """Sum up the balance per account for all entries strictly before 'date'.

    Args:
      entries: A list of directives.
      date: An optional datetime.date instance. If provided, stop accumulating
        on and after this date. This is useful for summarization before a
        specific date.
      compress_unbooked: For accounts that have a booking method of NONE,
        compress their positions into a single average position. This can be
        used when you export the full list of positions, because those accounts
        will have a myriad of small positions from fees at negative cost and
        what-not.
    Returns:
      A pair of a dict of account string to instance Inventory (the balance of
      this account before the given date), and the index in the list of entries
      where the date was encountered. If all entries are located before the
      cutoff date, an index one beyond the last entry is returned.

    """
    balances = collections.defaultdict(inventory.Inventory)
    for index, entry in enumerate(entries):
        if date and entry.date >= date:
            break

        if isinstance(entry, Transaction):
            for posting in entry.postings:
                account_balance = balances[posting.account]

                # Note: We must allow negative lots at cost, because this may be
                # used to reduce a filtered list of entries which may not
                # include the entries necessary to keep units at cost always
                # above zero. The only summation that is guaranteed to be above
                # zero is if all the entries are being summed together, no
                # entries are filtered, at least for a particular account's
                # postings.
                account_balance.add_position(posting)
    else:
        index = len(entries)

    # If the account has "NONE" booking method, merge all its postings
    # together in order to obtain an accurate cost basis and balance of
    # units.
    #
    # (This is a complex issue.) If you accrued positions without having them
    # booked properly against existing cost bases, you have not properly accounted
    # for the profit/loss to other postings. This means that the resulting
    # profit/loss is merged in the cost basis of the positive and negative
    # postings.
    if compress_unbooked:
        oc_map = getters.get_account_open_close(entries)
        accounts_map = {account: dopen for account, (dopen, _) in oc_map.items()}

        for account, balance in balances.items():
            dopen = accounts_map.get(account, None)
            if dopen is not None and dopen.booking is data.Booking.NONE:
                average_balance = balance.average()
                balances[account] = inventory.Inventory(pos for pos in average_balance)

    return balances, index

beancount.ops.summarize.cap(entries, account_types, conversion_currency, account_earnings, account_conversions)

Transfer net income to equity and insert a final conversion entry.

This is used to move and nullify balances from the income and expense accounts to an equity account in order to draw up a balance sheet with a balance of precisely zero.

Parameters:
  • entries – A list of directives.

  • account_types – An instance of AccountTypes.

  • conversion_currency – A string, the transfer currency to use for zero prices on the conversion entry.

  • account_earnings – A string, the name of the equity account to transfer final balances of the income and expense accounts to.

  • account_conversions – A string, the name of the equity account to use as the source for currency conversions.

Returns:
  • A modified list of entries, with the income and expense accounts transferred.

Source code in beancount/ops/summarize.py
def cap(entries,
        account_types,
        conversion_currency,
        account_earnings,
        account_conversions):
    """Transfer net income to equity and insert a final conversion entry.

    This is used to move and nullify balances from the income and expense
    accounts to an equity account in order to draw up a balance sheet with a
    balance of precisely zero.

    Args:
      entries: A list of directives.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the equity account to transfer
        final balances of the income and expense accounts to.
      account_conversions: A string, the name of the equity account to use as
        the source for currency conversions.
    Returns:
      A modified list of entries, with the income and expense accounts
      transferred.
    """

    # Transfer the balances of income and expense accounts as earnings / net
    # income.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    entries = transfer_balances(entries, None,
                                income_statement_account_pred,
                                account_earnings)

    # Insert final conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, None)

    return entries

beancount.ops.summarize.cap_opt(entries, options_map)

Close by getting all the parameters from an options map.

See cap() for details.

Parameters:
  • entries – See cap().

  • options_map – A parser's option_map.

Returns:
  • Same as close().

Source code in beancount/ops/summarize.py
def cap_opt(entries, options_map):
    """Close by getting all the parameters from an options map.

    See cap() for details.

    Args:
      entries: See cap().
      options_map: A parser's option_map.
    Returns:
      Same as close().
    """
    account_types = options.get_account_types(options_map)
    current_accounts = options.get_current_accounts(options_map)
    conversion_currency = options_map['conversion_currency']
    return cap(entries,
               account_types,
               conversion_currency,
               *current_accounts)

beancount.ops.summarize.clamp(entries, begin_date, end_date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)

Filter entries to include only those during a specified time period.

Firstly, this method will transfer all balances for the income and expense accounts occurring before the given period begin date to the 'account_earnings' account (earnings before the period, or "retained earnings") and summarize all of the transactions before that date against the 'account_opening' account (usually "opening balances"). The resulting income and expense accounts should have no transactions (since their balances have been transferred out and summarization of zero balances should not add any transactions).

Secondly, all the entries after the period end date will be truncated and a conversion entry will be added for the resulting transactions that reflect changes occurring between the beginning and end of the exercise period. The resulting balance of all account should be empty.

Parameters:
  • entries – A list of directive tuples.

  • begin_date – A datetime.date instance, the beginning of the period.

  • end_date – A datetime.date instance, one day beyond the end of the period.

  • account_types – An instance of AccountTypes.

  • conversion_currency – A string, the transfer currency to use for zero prices on the conversion entry.

  • account_earnings – A string, the name of the account to transfer previous earnings from the income statement accounts to the balance sheet.

  • account_opening – A string, the name of the account in equity to transfer previous balances from, in order to initialize account balances at the beginning of the period. This is typically called an opening balances account.

  • account_conversions – A string, the name of the equity account to book currency conversions against.

Returns:
  • A new list of entries is returned, and the index that points to the first original transaction after the beginning date of the period. This index can be used to generate the opening balances report, which is a balance sheet fed with only the summarized entries.

Source code in beancount/ops/summarize.py
def clamp(entries,
          begin_date, end_date,
          account_types,
          conversion_currency,
          account_earnings,
          account_opening,
          account_conversions):
    """Filter entries to include only those during a specified time period.

    Firstly, this method will transfer all balances for the income and expense
    accounts occurring before the given period begin date to the
    'account_earnings' account (earnings before the period, or "retained
    earnings") and summarize all of the transactions before that date against
    the 'account_opening' account (usually "opening balances"). The resulting
    income and expense accounts should have no transactions (since their
    balances have been transferred out and summarization of zero balances should
    not add any transactions).

    Secondly, all the entries after the period end date will be truncated and a
    conversion entry will be added for the resulting transactions that reflect
    changes occurring between the beginning and end of the exercise period. The
    resulting balance of all account should be empty.

    Args:
      entries: A list of directive tuples.
      begin_date: A datetime.date instance, the beginning of the period.
      end_date: A datetime.date instance, one day beyond the end of the period.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
      account_opening: A string, the name of the account in equity
        to transfer previous balances from, in order to initialize account
        balances at the beginning of the period. This is typically called an
        opening balances account.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to the first
      original transaction after the beginning date of the period. This index
      can be used to generate the opening balances report, which is a balance
      sheet fed with only the summarized entries.
    """
    # Transfer income and expenses before the period to equity.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    entries = transfer_balances(entries, begin_date,
                                income_statement_account_pred, account_earnings)

    # Summarize all the previous balances, after transferring the income and
    # expense balances, so all entries for those accounts before the begin date
    # should now disappear.
    entries, index = summarize(entries, begin_date, account_opening)

    # Truncate the entries after this.
    entries = truncate(entries, end_date)

    # Insert conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, end_date)

    return entries, index

beancount.ops.summarize.clamp_opt(entries, begin_date, end_date, options_map)

Clamp by getting all the parameters from an options map.

See clamp() for details.

Parameters:
  • entries – See clamp().

  • begin_date – See clamp().

  • end_date – See clamp().

  • options_map – A parser's option_map.

Returns:
  • Same as clamp().

Source code in beancount/ops/summarize.py
def clamp_opt(entries, begin_date, end_date, options_map):
    """Clamp by getting all the parameters from an options map.

    See clamp() for details.

    Args:
      entries: See clamp().
      begin_date: See clamp().
      end_date: See clamp().
      options_map: A parser's option_map.
    Returns:
      Same as clamp().
    """
    account_types = options.get_account_types(options_map)
    previous_earnings, previous_balances, _ = options.get_previous_accounts(options_map)
    _, current_conversions = options.get_current_accounts(options_map)

    conversion_currency = options_map['conversion_currency']
    return clamp(entries, begin_date, end_date,
                 account_types,
                 conversion_currency,
                 previous_earnings,
                 previous_balances,
                 current_conversions)

beancount.ops.summarize.clear(entries, date, account_types, account_earnings)

Transfer income and expenses balances at the given date to the equity accounts.

This method insert entries to zero out balances on income and expenses accounts by transferring them to an equity account.

Parameters:
  • entries – A list of directive tuples.

  • date – A datetime.date instance, one day beyond the end of the period. This date can be optionally left to None in order to close at the end of the list of entries.

  • account_types – An instance of AccountTypes.

  • account_earnings – A string, the name of the account to transfer previous earnings from the income statement accounts to the balance sheet.

Returns:
  • A new list of entries is returned, and the index that points to one before the last original transaction before the transfers.

Source code in beancount/ops/summarize.py
def clear(entries,
          date,
          account_types,
          account_earnings):
    """Transfer income and expenses balances at the given date to the equity accounts.

    This method insert entries to zero out balances on income and expenses
    accounts by transferring them to an equity account.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, one day beyond the end of the period. This
        date can be optionally left to None in order to close at the end of the
        list of entries.
      account_types: An instance of AccountTypes.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
    Returns:
      A new list of entries is returned, and the index that points to one before
      the last original transaction before the transfers.
    """
    index = len(entries)

    # Transfer income and expenses before the period to equity.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    new_entries = transfer_balances(entries, date,
                                    income_statement_account_pred, account_earnings)

    return new_entries, index

beancount.ops.summarize.clear_opt(entries, date, options_map)

Convenience function to clear() using an options map.

Source code in beancount/ops/summarize.py
def clear_opt(entries, date, options_map):
    """Convenience function to clear() using an options map.
    """
    account_types = options.get_account_types(options_map)
    current_accounts = options.get_current_accounts(options_map)
    return clear(entries, date, account_types, current_accounts[0])

beancount.ops.summarize.close(entries, date, conversion_currency, account_conversions)

Truncate entries that occur after a particular date and ensure balance.

This method essentially removes entries after a date. It truncates the future. To do so, it will

  1. Remove all entries which occur after 'date', if given.

  2. Insert conversion transactions at the end of the list of entries to ensure that the total balance of all postings sums up to empty.

The result is a list of entries with a total balance of zero, with possibly non-zero balances for the income/expense accounts. To produce a final balance sheet, use transfer() to move the net income to the equity accounts.

Parameters:
  • entries – A list of directive tuples.

  • date – A datetime.date instance, one day beyond the end of the period. This date can be optionally left to None in order to close at the end of the list of entries.

  • conversion_currency – A string, the transfer currency to use for zero prices on the conversion entry.

  • account_conversions – A string, the name of the equity account to book currency conversions against.

Returns:
  • A new list of entries is returned, and the index that points to one beyond the last original transaction that was provided. Further entries may have been inserted to normalize conversions and ensure the total balance sums to zero.

Source code in beancount/ops/summarize.py
def close(entries,
          date,
          conversion_currency,
          account_conversions):
    """Truncate entries that occur after a particular date and ensure balance.

    This method essentially removes entries after a date. It truncates the
    future. To do so, it will

    1. Remove all entries which occur after 'date', if given.

    2. Insert conversion transactions at the end of the list of entries to
       ensure that the total balance of all postings sums up to empty.

    The result is a list of entries with a total balance of zero, with possibly
    non-zero balances for the income/expense accounts. To produce a final
    balance sheet, use transfer() to move the net income to the equity accounts.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, one day beyond the end of the period. This
        date can be optionally left to None in order to close at the end of the
        list of entries.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to one beyond
      the last original transaction that was provided. Further entries may have
      been inserted to normalize conversions and ensure the total balance sums
      to zero.
    """

    # Truncate the entries after the date, if a date has been provided.
    if date is not None:
        entries = truncate(entries, date)

    # Keep an index to the truncated list of entries (before conversions).
    index = len(entries)

    # Insert a conversions entry to ensure the total balance of all accounts is
    # flush zero.
    entries = conversions(entries, account_conversions, conversion_currency, date)

    return entries, index

beancount.ops.summarize.close_opt(entries, date, options_map)

Convenience function to close() using an options map.

Source code in beancount/ops/summarize.py
def close_opt(entries, date, options_map):
    """Convenience function to close() using an options map.
    """
    conversion_currency = options_map['conversion_currency']
    current_accounts = options.get_current_accounts(options_map)
    return close(entries, date, conversion_currency, current_accounts[1])

beancount.ops.summarize.conversions(entries, conversion_account, conversion_currency, date=None)

Insert a conversion entry at date 'date' at the given account.

Parameters:
  • entries – A list of entries.

  • conversion_account – A string, the account to book against.

  • conversion_currency – A string, the transfer currency to use for zero prices on the conversion entry.

  • date – The date before which to insert the conversion entry. The new entry will be inserted as the last entry of the date just previous to this date.

Returns:
  • A modified list of entries.

Source code in beancount/ops/summarize.py
def conversions(entries, conversion_account, conversion_currency, date=None):
    """Insert a conversion entry at date 'date' at the given account.

    Args:
      entries: A list of entries.
      conversion_account: A string, the account to book against.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      date: The date before which to insert the conversion entry. The new
        entry will be inserted as the last entry of the date just previous
        to this date.
    Returns:
      A modified list of entries.
    """
    # Compute the balance at the given date.
    conversion_balance = interpolate.compute_entries_balance(entries, date=date)

    # Early exit if there is nothing to do.
    conversion_cost_balance = conversion_balance.reduce(convert.get_cost)
    if conversion_cost_balance.is_empty():
        return entries

    # Calculate the index and the date for the new entry. We want to store it as
    # the last transaction of the day before.
    if date is not None:
        index = bisect_key.bisect_left_with_key(entries, date, key=lambda entry: entry.date)
        last_date = date - datetime.timedelta(days=1)
    else:
        index = len(entries)
        last_date = entries[-1].date

    meta = data.new_metadata('<conversions>', -1)
    narration = 'Conversion for {}'.format(conversion_balance)
    conversion_entry = Transaction(meta, last_date, flags.FLAG_CONVERSIONS,
                                   None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
    for position in conversion_cost_balance.get_positions():
        # Important note: Set the cost to zero here to maintain the balance
        # invariant. (This is the only single place we cheat on the balance rule
        # in the entire system and this is necessary; see documentation on
        # Conversions.)
        price = amount.Amount(ZERO, conversion_currency)
        neg_pos = -position
        conversion_entry.postings.append(
            data.Posting(conversion_account, neg_pos.units, neg_pos.cost,
                         price, None, None))

    # Make a copy of the list of entries and insert the new transaction into it.
    new_entries = list(entries)
    new_entries.insert(index, conversion_entry)

    return new_entries

beancount.ops.summarize.create_entries_from_balances(balances, date, source_account, direction, meta, flag, narration_template)

"Create a list of entries from a dict of balances.

This method creates a list of new entries to transfer the amounts in the 'balances' dict to/from another account specified in 'source_account'.

The balancing posting is created with the equivalent at cost. In other words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a posting with this position on one leg, and with 5000 USD on the 'source_account' leg.

Parameters:
  • balances – A dict of account name strings to Inventory instances.

  • date – A datetime.date object, the date at which to create the transaction.

  • source_account – A string, the name of the account to pull the balances from. This is the magician's hat to pull the rabbit from.

  • direction – If 'direction' is True, the new entries transfer TO the balances account from the source account; otherwise the new entries transfer FROM the balances into the source account.

  • meta – A dict to use as metadata for the transactions.

  • flag – A string, the flag to use for the transactions.

  • narration_template – A format string for creating the narration. It is formatted with 'account' and 'date' replacement variables.

Returns:
  • A list of newly synthesizes Transaction entries.

Source code in beancount/ops/summarize.py
def create_entries_from_balances(balances, date, source_account, direction,
                                 meta, flag, narration_template):
    """"Create a list of entries from a dict of balances.

    This method creates a list of new entries to transfer the amounts in the
    'balances' dict to/from another account specified in 'source_account'.

    The balancing posting is created with the equivalent at cost. In other
    words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a
    posting with this position on one leg, and with 5000 USD on the
    'source_account' leg.

    Args:
      balances: A dict of account name strings to Inventory instances.
      date: A datetime.date object, the date at which to create the transaction.
      source_account: A string, the name of the account to pull the balances
        from. This is the magician's hat to pull the rabbit from.
      direction: If 'direction' is True, the new entries transfer TO the
        balances account from the source account; otherwise the new entries
        transfer FROM the balances into the source account.
      meta: A dict to use as metadata for the transactions.
      flag: A string, the flag to use for the transactions.
      narration_template: A format string for creating the narration. It is
        formatted with 'account' and 'date' replacement variables.
    Returns:
      A list of newly synthesizes Transaction entries.
    """
    new_entries = []
    for account, account_balance in sorted(balances.items()):

        # Don't create new entries where there is no balance.
        if account_balance.is_empty():
            continue

        narration = narration_template.format(account=account, date=date)

        if not direction:
            account_balance = -account_balance

        postings = []
        new_entry = Transaction(
            meta, date, flag, None, narration, data.EMPTY_SET, data.EMPTY_SET, postings)

        for position in account_balance.get_positions():
            postings.append(data.Posting(account, position.units, position.cost,
                                         None, None, None))
            cost = -convert.get_cost(position)
            postings.append(data.Posting(source_account, cost, None,
                                         None, None, None))

        new_entries.append(new_entry)

    return new_entries

beancount.ops.summarize.get_open_entries(entries, date)

Gather the list of active Open entries at date.

This returns the list of Open entries that have not been closed at the given date, in the same order they were observed in the document.

Parameters:
  • entries – A list of directives.

  • date – The date at which to look for an open entry. If not specified, will return the entries still open at the latest date.

Returns:
  • A list of Open directives.

Source code in beancount/ops/summarize.py
def get_open_entries(entries, date):
    """Gather the list of active Open entries at date.

    This returns the list of Open entries that have not been closed at the given
    date, in the same order they were observed in the document.

    Args:
      entries: A list of directives.
      date: The date at which to look for an open entry. If not specified, will
        return the entries still open at the latest date.
    Returns:
      A list of Open directives.
    """
    open_entries = {}
    for index, entry in enumerate(entries):
        if date is not None and entry.date >= date:
            break

        if isinstance(entry, Open):
            try:
                ex_index, ex_entry = open_entries[entry.account]
                if entry.date < ex_entry.date:
                    open_entries[entry.account] = (index, entry)
            except KeyError:
                open_entries[entry.account] = (index, entry)

        elif isinstance(entry, Close):
            # If there is no corresponding open, don't raise an error.
            open_entries.pop(entry.account, None)

    return [entry for (index, entry) in sorted(open_entries.values())]

beancount.ops.summarize.open(entries, date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)

Summarize entries before a date and transfer income/expenses to equity.

This method essentially prepares a list of directives to contain only transactions that occur after a particular date. It truncates the past. To do so, it will

  1. Insert conversion transactions at the given open date, then

  2. Insert transactions at that date to move accumulated balances from before that date from the income and expenses accounts to an equity account, and finally

  3. It removes all the transactions previous to the date and replaces them by opening balances entries to bring the balances to the same amount.

The result is a list of entries for which the income and expense accounts are beginning with a balance of zero, and all other accounts begin with a transaction that brings their balance to the expected amount. All the past has been summarized at that point.

An index is returned to the first transaction past the balance opening transactions, so you can keep just those in order to render a balance sheet for only the opening balances.

Parameters:
  • entries – A list of directive tuples.

  • date – A datetime.date instance, the date at which to do this.

  • account_types – An instance of AccountTypes.

  • conversion_currency – A string, the transfer currency to use for zero prices on the conversion entry.

  • account_earnings – A string, the name of the account to transfer previous earnings from the income statement accounts to the balance sheet.

  • account_opening – A string, the name of the account in equity to transfer previous balances from, in order to initialize account balances at the beginning of the period. This is typically called an opening balances account.

  • account_conversions – A string, the name of the equity account to book currency conversions against.

Returns:
  • A new list of entries is returned, and the index that points to the first original transaction after the beginning date of the period. This index can be used to generate the opening balances report, which is a balance sheet fed with only the summarized entries.

Source code in beancount/ops/summarize.py
def open(entries,
         date,
         account_types,
         conversion_currency,
         account_earnings,
         account_opening,
         account_conversions):
    """Summarize entries before a date and transfer income/expenses to equity.

    This method essentially prepares a list of directives to contain only
    transactions that occur after a particular date. It truncates the past. To
    do so, it will

    1. Insert conversion transactions at the given open date, then

    2. Insert transactions at that date to move accumulated balances from before
       that date from the income and expenses accounts to an equity account, and
       finally

    3. It removes all the transactions previous to the date and replaces them by
       opening balances entries to bring the balances to the same amount.

    The result is a list of entries for which the income and expense accounts
    are beginning with a balance of zero, and all other accounts begin with a
    transaction that brings their balance to the expected amount. All the past
    has been summarized at that point.

    An index is returned to the first transaction past the balance opening
    transactions, so you can keep just those in order to render a balance sheet
    for only the opening balances.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, the date at which to do this.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
      account_opening: A string, the name of the account in equity
        to transfer previous balances from, in order to initialize account
        balances at the beginning of the period. This is typically called an
        opening balances account.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to the first
      original transaction after the beginning date of the period. This index
      can be used to generate the opening balances report, which is a balance
      sheet fed with only the summarized entries.

    """
    # Insert conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, date)

    # Transfer income and expenses before the period to equity.
    entries, _ = clear(entries, date, account_types, account_earnings)

    # Summarize all the previous balances, after transferring the income and
    # expense balances, so all entries for those accounts before the begin date
    # should now disappear.
    entries, index = summarize(entries, date, account_opening)

    return entries, index

beancount.ops.summarize.open_opt(entries, date, options_map)

Convenience function to open() using an options map.

Source code in beancount/ops/summarize.py
def open_opt(entries, date, options_map):
    """Convenience function to open() using an options map.
    """
    account_types = options.get_account_types(options_map)
    previous_accounts = options.get_previous_accounts(options_map)
    conversion_currency = options_map['conversion_currency']
    return open(entries, date, account_types, conversion_currency, *previous_accounts)

beancount.ops.summarize.summarize(entries, date, account_opening)

Summarize all entries before a date by replacing then with summarization entries.

This function replaces the transactions up to (and not including) the given date with a opening balance transactions, one for each account. It returns new entries, all of the transactions before the given date having been replaced by a few summarization entries, one for each account.

Notes: - Open entries are preserved for active accounts. - The last relevant price entry for each (base, quote) pair is preserved. - All other entries before the cutoff date are culled.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance, the cutoff date before which to summarize.

  • account_opening – A string, the name of the source account to book summarization entries against.

Returns:
  • The function returns a list of new entries and the integer index at which the entries on or after the cutoff date begin.

Source code in beancount/ops/summarize.py
def summarize(entries, date, account_opening):
    """Summarize all entries before a date by replacing then with summarization entries.

    This function replaces the transactions up to (and not including) the given
    date with a opening balance transactions, one for each account. It returns
    new entries, all of the transactions before the given date having been
    replaced by a few summarization entries, one for each account.

    Notes:
    - Open entries are preserved for active accounts.
    - The last relevant price entry for each (base, quote) pair is preserved.
    - All other entries before the cutoff date are culled.

    Args:
      entries: A list of directives.
      date: A datetime.date instance, the cutoff date before which to summarize.
      account_opening: A string, the name of the source account to book summarization
        entries against.
    Returns:
      The function returns a list of new entries and the integer index at which
      the entries on or after the cutoff date begin.
    """
    # Compute balances at date.
    balances, index = balance_by_account(entries, date)

    # We need to insert the entries with a date previous to subsequent checks,
    # to maintain ensure the open directives show up before any transaction.
    summarize_date = date - datetime.timedelta(days=1)

    # Create summarization / opening balance entries.
    summarizing_entries = create_entries_from_balances(
        balances, summarize_date, account_opening, True,
        data.new_metadata('<summarize>', 0), flags.FLAG_SUMMARIZE,
        "Opening balance for '{account}' (Summarization)")

    # Insert the last price entry for each commodity from before the date.
    price_entries = prices.get_last_price_entries(entries, date)

    # Gather the list of active open entries at date.
    open_entries = get_open_entries(entries, date)

    # Compute entries before the date and preserve the entries after the date.
    before_entries = sorted(open_entries + price_entries + summarizing_entries,
                            key=data.entry_sortkey)
    after_entries = entries[index:]

    # Return a new list of entries and the index that points after the entries
    # were inserted.
    return (before_entries + after_entries), len(before_entries)

beancount.ops.summarize.transfer_balances(entries, date, account_pred, transfer_account)

Synthesize transactions to transfer balances from some accounts at a given date.

For all accounts that match the 'account_pred' predicate, create new entries to transfer the balance at the given date from the account to the transfer account. This is used to transfer balances from income and expenses from a previous period to a "retained earnings" account. This is accomplished by creating new entries.

Note that inserting transfers breaks any following balance checks that are in the transferred accounts. For this reason, all balance assertion entries following the cutoff date for those accounts are removed from the list in output.

Parameters:
  • entries – A list of directives.

  • date – A datetime.date instance, the date at which to make the transfer.

  • account_pred – A predicate function that, given an account string, returns true if the account is meant to be transferred.

  • transfer_account – A string, the name of the source account to be used on the transfer entries to receive balances at the given date.

Returns:
  • A new list of entries, with the new transfer entries added in.

Source code in beancount/ops/summarize.py
def transfer_balances(entries, date, account_pred, transfer_account):
    """Synthesize transactions to transfer balances from some accounts at a given date.

    For all accounts that match the 'account_pred' predicate, create new entries
    to transfer the balance at the given date from the account to the transfer
    account. This is used to transfer balances from income and expenses from a
    previous period to a "retained earnings" account. This is accomplished by
    creating new entries.

    Note that inserting transfers breaks any following balance checks that are
    in the transferred accounts. For this reason, all balance assertion entries
    following the cutoff date for those accounts are removed from the list in
    output.

    Args:
      entries: A list of directives.
      date: A datetime.date instance, the date at which to make the transfer.
      account_pred: A predicate function that, given an account string, returns
        true if the account is meant to be transferred.
      transfer_account: A string, the name of the source account to be used on
        the transfer entries to receive balances at the given date.
    Returns:
      A new list of entries, with the new transfer entries added in.
    """
    # Don't bother doing anything if there are no entries.
    if not entries:
        return entries

    # Compute balances at date.
    balances, index = balance_by_account(entries, date)

    # Filter out to keep only the accounts we want.
    transfer_balances = {account: balance
                         for account, balance in balances.items()
                         if account_pred(account)}

    # We need to insert the entries at the end of the previous day.
    if date:
        transfer_date = date - datetime.timedelta(days=1)
    else:
        transfer_date = entries[-1].date

    # Create transfer entries.
    transfer_entries = create_entries_from_balances(
        transfer_balances, transfer_date, transfer_account, False,
        data.new_metadata('<transfer_balances>', 0), flags.FLAG_TRANSFER,
        "Transfer balance for '{account}' (Transfer balance)")

    # Remove balance assertions that occur after a transfer on an account that
    # has been transferred away; they would break.
    after_entries = [entry
                     for entry in entries[index:]
                     if not (isinstance(entry, data.Balance) and
                             entry.account in transfer_balances)]

    # Split the new entries in a new list.
    return (entries[:index] + transfer_entries + after_entries)

beancount.ops.summarize.truncate(entries, date)

Filter out all the entries at and after date. Returns a new list of entries.

Parameters:
  • entries – A sorted list of directives.

  • date – A datetime.date instance.

Returns:
  • A truncated list of directives.

Source code in beancount/ops/summarize.py
def truncate(entries, date):
    """Filter out all the entries at and after date. Returns a new list of entries.

    Args:
      entries: A sorted list of directives.
      date: A datetime.date instance.
    Returns:
      A truncated list of directives.
    """
    index = bisect_key.bisect_left_with_key(entries, date,
                                            key=lambda entry: entry.date)
    return entries[:index]

beancount.ops.validation

Validation checks.

These checks are intended to be run after all the plugins have transformed the list of entries, just before serving them or generating reports from them. The idea is to ensure a reasonable set of invariants and generate errors if those invariants are violated. They are not sanity checks--user data is subject to constraints which are hopefully detected here and which will result in errors trickled up to the user.

beancount.ops.validation.ValidationError (tuple)

ValidationError(source, message, entry)

beancount.ops.validation.ValidationError.__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in beancount/ops/validation.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.validation.ValidationError.__new__(_cls, source, message, entry) special staticmethod

Create new instance of ValidationError(source, message, entry)

beancount.ops.validation.ValidationError.__replace__(/, self, **kwds) special

Return a new ValidationError object replacing specified fields with new values

Source code in beancount/ops/validation.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.validation.ValidationError.__repr__(self) special

Return a nicely formatted representation string

Source code in beancount/ops/validation.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.validation.validate(entries, options_map, log_timings=None, extra_validations=None)

Perform all the standard checks on parsed contents.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

  • log_timings – An optional function to use for logging the time of individual operations.

  • extra_validations – A list of extra validation functions to run after loading this list of entries.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate(entries, options_map, log_timings=None, extra_validations=None):
    """Perform all the standard checks on parsed contents.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      log_timings: An optional function to use for logging the time of individual
        operations.
      extra_validations: A list of extra validation functions to run after loading
        this list of entries.
    Returns:
      A list of new errors, if any were found.
    """
    validation_tests = VALIDATIONS
    if extra_validations:
        validation_tests += extra_validations

    # Run various validation routines define above.
    errors = []
    for validation_function in validation_tests:
        with misc_utils.log_time('function: {}'.format(validation_function.__name__),
                                 log_timings, indent=2):
            new_errors = validation_function(entries, options_map)
        errors.extend(new_errors)

    return errors

beancount.ops.validation.validate_active_accounts(entries, unused_options_map)

Check that all references to accounts occurs on active accounts.

We basically check that references to accounts from all directives other than Open and Close occur at dates the open-close interval of that account. This should be good for all of the directive types where we can extract an account name.

Note that this is more strict a check than comparing the dates: we actually check that no references to account are made on the same day before the open directive appears for that account. This is a nice property to have, and is supported by our custom sorting routine that will sort open entries before transaction entries, given the same date.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_active_accounts(entries, unused_options_map):
    """Check that all references to accounts occurs on active accounts.

    We basically check that references to accounts from all directives other
    than Open and Close occur at dates the open-close interval of that account.
    This should be good for all of the directive types where we can extract an
    account name.

    Note that this is more strict a check than comparing the dates: we actually
    check that no references to account are made on the same day before the open
    directive appears for that account. This is a nice property to have, and is
    supported by our custom sorting routine that will sort open entries before
    transaction entries, given the same date.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    error_pairs = []
    active_set = set()
    opened_accounts = set()
    for entry in entries:
        if isinstance(entry, data.Open):
            active_set.add(entry.account)
            opened_accounts.add(entry.account)

        elif isinstance(entry, data.Close):
            active_set.discard(entry.account)

        else:
            for account in getters.get_entry_accounts(entry):
                if account not in active_set:
                    # Allow document and note directives that occur after an
                    # account is closed.
                    if (isinstance(entry, ALLOW_AFTER_CLOSE) and
                        account in opened_accounts):
                        continue

                    # Register an error to be logged later, with an appropriate
                    # message.
                    error_pairs.append((account, entry))

    # Refine the error message to disambiguate between the case of an account
    # that has never been seen and one that was simply not active at the time.
    errors = []
    for account, entry in error_pairs:
        if account in opened_accounts:
            message = "Invalid reference to inactive account '{}'".format(account)
        else:
            message = "Invalid reference to unknown account '{}'".format(account)
        errors.append(ValidationError(entry.meta, message, entry))

    return errors

beancount.ops.validation.validate_check_transaction_balances(entries, options_map)

Check again that all transaction postings balance, as users may have transformed transactions.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_check_transaction_balances(entries, options_map):
    """Check again that all transaction postings balance, as users may have
    transformed transactions.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    # Note: this is a bit slow; we could limit our checks to the original
    # transactions by using the hash function in the loader.
    errors = []
    for entry in entries:
        if isinstance(entry, Transaction):
            # IMPORTANT: This validation is _crucial_ and cannot be skipped.
            # This is where we actually detect and warn on unbalancing
            # transactions. This _must_ come after the user routines, because
            # unbalancing input is legal, as those types of transactions may be
            # "fixed up" by a user-plugin. In other words, we want to allow
            # users to input unbalancing transactions as long as the final
            # transactions objects that appear on the stream (after processing
            # the plugins) are balanced. See {9e6c14b51a59}.
            #
            # Detect complete sets of postings that have residual balance;
            residual = interpolate.compute_residual(entry.postings)
            tolerances = interpolate.infer_tolerances(entry.postings, options_map)
            if not residual.is_small(tolerances):
                errors.append(
                    ValidationError(entry.meta,
                                    "Transaction does not balance: {}".format(residual),
                                    entry))

    return errors

beancount.ops.validation.validate_currency_constraints(entries, options_map)

Check the currency constraints from account open declarations.

Open directives admit an optional list of currencies that specify the only types of commodities that the running inventory for this account may contain. This function checks that all postings are only made in those commodities.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_currency_constraints(entries, options_map):
    """Check the currency constraints from account open declarations.

    Open directives admit an optional list of currencies that specify the only
    types of commodities that the running inventory for this account may
    contain. This function checks that all postings are only made in those
    commodities.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """

    # Get all the open entries with currency constraints.
    open_map = {entry.account: entry
                for entry in entries
                if isinstance(entry, Open) and entry.currencies}

    errors = []
    for entry in entries:
        if not isinstance(entry, Transaction):
            continue

        for posting in entry.postings:
            # Look up the corresponding account's valid currencies; skip the
            # check if there are none specified.
            try:
                open_entry = open_map[posting.account]
                valid_currencies = open_entry.currencies
                if not valid_currencies:
                    continue
            except KeyError:
                continue

            # Perform the check.
            if posting.units.currency not in valid_currencies:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Invalid currency {} for account '{}'".format(
                            posting.units.currency, posting.account),
                        entry))

    return errors

beancount.ops.validation.validate_data_types(entries, options_map)

Check that all the data types of the attributes of entries are as expected.

Users are provided with a means to filter the list of entries. They're able to write code that manipulates those tuple objects without any type constraints. With discipline, this mostly works, but I know better: check, just to make sure. This routine checks all the data types and assumptions on entries.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_data_types(entries, options_map):
    """Check that all the data types of the attributes of entries are as expected.

    Users are provided with a means to filter the list of entries. They're able to
    write code that manipulates those tuple objects without any type constraints.
    With discipline, this mostly works, but I know better: check, just to make sure.
    This routine checks all the data types and assumptions on entries.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []
    for entry in entries:
        try:
            data.sanity_check_types(
                entry, options_map["allow_deprecated_none_for_tags_and_links"])
        except AssertionError as exc:
            errors.append(
                ValidationError(entry.meta,
                                "Invalid data types: {}".format(exc),
                                entry))
    return errors

beancount.ops.validation.validate_documents_paths(entries, options_map)

Check that all filenames in resolved Document entries are absolute filenames.

The processing of document entries is assumed to result in absolute paths. Relative paths are resolved at the parsing stage and at point we want to make sure we don't have to do any further processing on them.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_documents_paths(entries, options_map):
    """Check that all filenames in resolved Document entries are absolute filenames.

    The processing of document entries is assumed to result in absolute paths.
    Relative paths are resolved at the parsing stage and at point we want to
    make sure we don't have to do any further processing on them.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    return [ValidationError(entry.meta, "Invalid relative path for entry", entry)
            for entry in entries
            if (isinstance(entry, Document) and
                not path.isabs(entry.filename))]

beancount.ops.validation.validate_duplicate_balances(entries, unused_options_map)

Check that balance entries occur only once per day.

Because we do not support time, and the declaration order of entries is meant to be kept irrelevant, two balance entries with different amounts should not occur in the file. We do allow two identical balance assertions, however, because this may occur during import.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_duplicate_balances(entries, unused_options_map):
    """Check that balance entries occur only once per day.

    Because we do not support time, and the declaration order of entries is
    meant to be kept irrelevant, two balance entries with different amounts
    should not occur in the file. We do allow two identical balance assertions,
    however, because this may occur during import.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # Mapping of (account, currency, date) to Balance entry.
    balance_entries = {}
    for entry in entries:
        if not isinstance(entry, data.Balance):
            continue

        key = (entry.account, entry.amount.currency, entry.date)
        try:
            previous_entry = balance_entries[key]
            if entry.amount != previous_entry.amount:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate balance assertion with different amounts",
                        entry))
        except KeyError:
            balance_entries[key] = entry

    return errors

beancount.ops.validation.validate_duplicate_commodities(entries, unused_options_map)

Check that commodity entries are unique for each commodity.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_duplicate_commodities(entries, unused_options_map):
    """Check that commodity entries are unique for each commodity.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # Mapping of (account, currency, date) to Balance entry.
    commodity_entries = {}
    for entry in entries:
        if not isinstance(entry, data.Commodity):
            continue

        key = entry.currency
        try:
            previous_entry = commodity_entries[key]
            if previous_entry:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate commodity directives for '{}'".format(key),
                        entry))
        except KeyError:
            commodity_entries[key] = entry

    return errors

beancount.ops.validation.validate_open_close(entries, unused_options_map)

Check constraints on open and close directives themselves.

This method checks two kinds of constraints:

  1. An open or a close directive may only show up once for each account. If a duplicate is detected, an error is generated.

  2. Close directives may only appear if an open directive has been seen previously (chronologically).

  3. The date of close directives must be strictly greater than their corresponding open directive.

Parameters:
  • entries – A list of directives.

  • unused_options_map – An options map.

Returns:
  • A list of new errors, if any were found.

Source code in beancount/ops/validation.py
def validate_open_close(entries, unused_options_map):
    """Check constraints on open and close directives themselves.

    This method checks two kinds of constraints:

    1. An open or a close directive may only show up once for each account. If a
       duplicate is detected, an error is generated.

    2. Close directives may only appear if an open directive has been seen
       previously (chronologically).

    3. The date of close directives must be strictly greater than their
       corresponding open directive.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []
    open_map = {}
    close_map = {}
    for entry in entries:

        if isinstance(entry, Open):
            if entry.account in open_map:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate open directive for {}".format(entry.account),
                        entry))
            else:
                open_map[entry.account] = entry

        elif isinstance(entry, Close):
            if entry.account in close_map:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate close directive for {}".format(entry.account),
                        entry))
            else:
                try:
                    open_entry = open_map[entry.account]
                    if entry.date < open_entry.date:
                        errors.append(
                            ValidationError(
                                entry.meta,
                                "Internal error: closing date for {} "
                                "appears before opening date".format(entry.account),
                                entry))
                except KeyError:
                    errors.append(
                        ValidationError(
                            entry.meta,
                            "Unopened account {} is being closed".format(entry.account),
                            entry))

                close_map[entry.account] = entry

    return errors