beancount.scripts
Implementation of the various scripts available from bin.
This is structured this way because we want all the significant codes under a single directory, for analysis, grepping and unit testing.
beancount.scripts.deps
Check the installation dependencies and report the version numbers of each.
This is meant to be used as an error diagnostic tool.
beancount.scripts.deps.check_cdecimal()
Check that Python 3.3 or above is installed.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_cdecimal():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
# Note: this code mirrors and should be kept in-sync with that at the top of
# beancount.core.number.
# Try the built-in installation.
import decimal
if is_fast_decimal(decimal):
return ('cdecimal', '{} (built-in)'.format(decimal.__version__), True)
# Try an explicitly installed version.
try:
import cdecimal
if is_fast_decimal(cdecimal):
return ('cdecimal', getattr(cdecimal, '__version__', 'OKAY'), True)
except ImportError:
pass
# Not found.
return ('cdecimal', None, False)
beancount.scripts.deps.check_dependencies()
Check the runtime dependencies and report their version numbers.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_dependencies():
"""Check the runtime dependencies and report their version numbers.
Returns:
A list of pairs of (package-name, version-number, sufficient) whereby if a
package has not been installed, its 'version-number' will be set to None.
Otherwise, it will be a string with the version number in it. 'sufficient'
will be True if the version if sufficient for this installation of
Beancount.
"""
return [
# Check for a complete installation of Python itself.
check_python(),
check_cdecimal(),
# Modules we really do need installed.
check_import('dateutil'),
check_import('ply', module_name='ply.yacc', min_version='3.4'),
# Optionally required to upload data to Google Drive.
# TODO(blais, 2023-11-18): oauth2client is deprecated.
check_import('googleapiclient'),
check_import('oauth2client'),
check_import('httplib2'),
# Optionally required to support various price source fetchers.
check_import('requests', min_version='2.0'),
# Optionally required to support imports (identify, extract, file) code.
check_python_magic(),
]
beancount.scripts.deps.check_import(package_name, min_version=None, module_name=None)
Check that a particular module name is installed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_import(package_name, min_version=None, module_name=None):
"""Check that a particular module name is installed.
Args:
package_name: A string, the name of the package and module to be
imported to verify this works. This should have a __version__
attribute on it.
min_version: If not None, a string, the minimum version number
we require.
module_name: The name of the module to import if it differs from the
package name.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
if module_name is None:
module_name = package_name
try:
__import__(module_name)
module = sys.modules[module_name]
if min_version is not None:
version = module.__version__
assert isinstance(version, str)
is_sufficient = (parse_version(version) >= parse_version(min_version)
if min_version else True)
else:
version, is_sufficient = None, True
except ImportError:
version, is_sufficient = None, False
return (package_name, version, is_sufficient)
beancount.scripts.deps.check_python()
Check that Python 3.3 or above is installed.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_python():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
return ('python3',
'.'.join(map(str, sys.version_info[:3])),
sys.version_info[:2] >= (3, 3))
beancount.scripts.deps.check_python_magic()
Check that a recent-enough version of python-magic is installed.
python-magic is an interface to libmagic, which is used by the 'file' tool and UNIX to identify file types. Note that there are two Python wrappers which provide the 'magic' import: python-magic and filemagic. The former is what we need, which appears to be more recently maintained.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_python_magic():
"""Check that a recent-enough version of python-magic is installed.
python-magic is an interface to libmagic, which is used by the 'file' tool
and UNIX to identify file types. Note that there are two Python wrappers
which provide the 'magic' import: python-magic and filemagic. The former is
what we need, which appears to be more recently maintained.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
try:
import magic
# Check that python-magic and not filemagic is installed.
if not hasattr(magic, 'from_file'):
# 'filemagic' is installed; install python-magic.
raise ImportError
return ('python-magic', 'OK', True)
except (ImportError, OSError):
return ('python-magic', None, False)
beancount.scripts.deps.is_fast_decimal(decimal_module)
Return true if a fast C decimal implementation is installed.
Source code in beancount/scripts/deps.py
def is_fast_decimal(decimal_module):
"Return true if a fast C decimal implementation is installed."
return isinstance(decimal_module.Decimal().sqrt, types.BuiltinFunctionType)
beancount.scripts.deps.list_dependencies(file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)
Check the dependencies and produce a listing on the given file.
Parameters: |
|
---|
Source code in beancount/scripts/deps.py
def list_dependencies(file=sys.stderr):
"""Check the dependencies and produce a listing on the given file.
Args:
file: A file object to write the output to.
"""
print("Dependencies:")
for package, version, sufficient in check_dependencies():
print(" {:16}: {} {}".format(
package,
version or 'NOT INSTALLED',
"(INSUFFICIENT)" if version and not sufficient else ""),
file=file)
beancount.scripts.deps.parse_version(version_str)
Parse the version string into a comparable tuple.
Source code in beancount/scripts/deps.py
def parse_version(version_str: str) -> str:
"""Parse the version string into a comparable tuple."""
return [int(v) for v in version_str.split('.')]
beancount.scripts.directories
Check that document directories mirror a list of accounts correctly.
beancount.scripts.directories.ValidateDirectoryError (Exception)
A directory validation error.
beancount.scripts.directories.validate_directories(entries, document_dirs)
Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized subdirectory names under the given roots match the account names.
Parameters: |
|
---|
Source code in beancount/scripts/directories.py
def validate_directories(entries, document_dirs):
"""Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized
subdirectory names under the given roots match the account names.
Args:
entries: A list of directives.
document_dirs: A list of string, the directory roots to walk and validate.
"""
# Get the list of accounts declared in the ledge.
accounts = getters.get_accounts(entries)
# For each of the roots, validate the hierarchy of directories.
for document_dir in document_dirs:
errors = validate_directory(accounts, document_dir)
for error in errors:
print("ERROR: {}".format(error))
beancount.scripts.directories.validate_directory(accounts, document_dir)
Check a directory hierarchy against a list of valid accounts.
Walk the directory hierarchy, and for all directories with names matching that of accounts (with ":" replaced with "/"), check that they refer to an account name declared in the given list.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/directories.py
def validate_directory(accounts, document_dir):
"""Check a directory hierarchy against a list of valid accounts.
Walk the directory hierarchy, and for all directories with names matching
that of accounts (with ":" replaced with "/"), check that they refer to an
account name declared in the given list.
Args:
account: A set or dict of account names.
document_dir: A string, the root directory to walk and validate.
Returns:
An errors for each invalid directory name found.
"""
# Generate all parent accounts in the account_set we're checking against, so
# that parent directories with no corresponding account don't warn.
accounts_with_parents = set(accounts)
for account_ in accounts:
while True:
parent = account.parent(account_)
if not parent:
break
if parent in accounts_with_parents:
break
accounts_with_parents.add(parent)
account_ = parent
errors = []
for directory, account_name, _, _ in account.walk(document_dir):
if account_name not in accounts_with_parents:
errors.append(ValidateDirectoryError(
"Invalid directory '{}': no corresponding account '{}'".format(
directory, account_name)))
return errors
beancount.scripts.doctor
Debugging tool for those finding bugs in Beancount.
This tool is able to dump lexer/parser state, and will provide other services in the name of debugging.
beancount.scripts.doctor.FileLocation (ParamType)
beancount.scripts.doctor.FileLocation.convert(self, value, param, ctx)
Convert the value to the correct type. This is not called if
the value is None
(the missing value).
This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The param
and ctx
arguments may be None
in certain
situations, such as when converting prompt input.
If the value cannot be converted, call :meth:fail
with a
descriptive message.
:param value: The value to convert.
:param param: The parameter that is using this type to convert
its value. May be None
.
:param ctx: The current context that arrived at this value. May
be None
.
Source code in beancount/scripts/doctor.py
def convert(self, value, param, ctx):
match = re.match(r"(?:(.+):)?(\d+)$", value)
if not match:
self.fail("{!r} is not a valid location".format(value), param, ctx)
filename, lineno = match.groups()
if filename:
filename = os.path.abspath(filename)
return filename, int(lineno)
beancount.scripts.doctor.FileRegion (ParamType)
beancount.scripts.doctor.FileRegion.convert(self, value, param, ctx)
Convert the value to the correct type. This is not called if
the value is None
(the missing value).
This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The param
and ctx
arguments may be None
in certain
situations, such as when converting prompt input.
If the value cannot be converted, call :meth:fail
with a
descriptive message.
:param value: The value to convert.
:param param: The parameter that is using this type to convert
its value. May be None
.
:param ctx: The current context that arrived at this value. May
be None
.
Source code in beancount/scripts/doctor.py
def convert(self, value, param, ctx):
match = re.match(r"(?:(.+):)?(\d+):(\d+)$", value)
if not match:
self.fail("{!r} is not a valid region".format(value), param, ctx)
filename, start_lineno, end_lineno = match.groups()
if filename:
filename = os.path.abspath(filename)
return filename, int(start_lineno), int(end_lineno)
beancount.scripts.doctor.Group (Group)
beancount.scripts.doctor.Group.command(self, *args, *, alias=None, **kwargs)
A shortcut decorator for declaring and attaching a command to
the group. This takes the same arguments as :func:command
and
immediately registers the created command with this group by
calling :meth:add_command
.
To customize the command class used, set the
:attr:command_class
attribute.
.. versionchanged:: 8.1 This decorator can be applied without parentheses.
.. versionchanged:: 8.0
Added the :attr:command_class
attribute.
Source code in beancount/scripts/doctor.py
def command(self, *args, alias=None, **kwargs):
wrap = click.Group.command(self, *args, **kwargs)
def decorator(f):
cmd = wrap(f)
if alias:
self.aliases[alias] = cmd.name
return cmd
return decorator
beancount.scripts.doctor.Group.get_command(self, ctx, cmd_name)
Given a context and a command name, this returns a
:class:Command
object if it exists or returns None
.
Source code in beancount/scripts/doctor.py
def get_command(self, ctx, cmd_name):
# aliases
name = self.aliases.get(cmd_name, cmd_name)
# allow to use '_' or '-' in command names.
name = name.replace('_', '-')
return click.Group.get_command(self, ctx, name)
beancount.scripts.doctor.RenderError (tuple)
RenderError(source, message, entry)
beancount.scripts.doctor.RenderError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/doctor.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.doctor.RenderError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of RenderError(source, message, entry)
beancount.scripts.doctor.RenderError.__replace__(/, self, **kwds)
special
Return a new RenderError object replacing specified fields with new values
Source code in beancount/scripts/doctor.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.scripts.doctor.RenderError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/doctor.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.doctor.find_linked_entries(entries, links, follow_links)
Find all linked entries.
Note that there is an option here: You can either just look at the links on the closest entry, or you can include the links of the linked transactions as well. Whichever one you want depends on how you use your links. Best would be to query the user (in Emacs) when there are many links present.
Source code in beancount/scripts/doctor.py
def find_linked_entries(entries, links, follow_links: bool):
"""Find all linked entries.
Note that there is an option here: You can either just look at the links
on the closest entry, or you can include the links of the linked
transactions as well. Whichever one you want depends on how you use your
links. Best would be to query the user (in Emacs) when there are many
links present.
"""
linked_entries = []
if not follow_links:
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
else:
links = set(links)
linked_entries = []
while True:
num_linked = len(linked_entries)
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
if len(linked_entries) == num_linked:
break
for entry in linked_entries:
if entry.links:
links.update(entry.links)
return linked_entries
beancount.scripts.doctor.find_tagged_entries(entries, tag)
Find all entries with the given tag.
Source code in beancount/scripts/doctor.py
def find_tagged_entries(entries, tag):
"""Find all entries with the given tag."""
return [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.tags and
tag in entry.tags)]
beancount.scripts.doctor.render_mini_balances(entries, options_map, conversion=None, price_map=None)
Render a treeified list of the balances for the given transactions.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def render_mini_balances(entries, options_map, conversion=None, price_map=None):
"""Render a treeified list of the balances for the given transactions.
Args:
entries: A list of selected transactions to render.
options_map: The parsed options.
conversion: Conversion method string, None, 'value' or 'cost'.
price_map: A price map from the original entries. If this isn't provided,
the inventories are rendered directly. If it is, their contents are
converted to market value.
"""
# Render linked entries (in date order) as errors (for Emacs).
errors = [RenderError(entry.meta, '', entry)
for entry in entries]
printer.print_errors(errors)
# Print out balances.
real_root = realization.realize(entries)
dformat = options_map['dcontext'].build(alignment=Align.DOT, reserved=2)
# TODO(blais): I always want to be able to convert at cost. We need
# arguments capability.
#
# TODO(blais): Ideally this conversion inserts a new transactions to
# 'Unrealized' to account for the difference between cost and market value.
# Insert one and update the realization. Add an update() method to the
# realization, given a transaction.
acctypes = options.get_account_types(options_map)
if conversion == 'value':
assert price_map is not None
# Warning: Mutate the inventories in-place, converting them to market
# value.
balance_diff = inventory.Inventory()
for real_account in realization.iter_children(real_root):
balance_cost = real_account.balance.reduce(convert.get_cost)
balance_value = real_account.balance.reduce(convert.get_value, price_map)
real_account.balance = balance_value
balance_diff.add_inventory(balance_cost)
balance_diff.add_inventory(-balance_value)
if not balance_diff.is_empty():
account_unrealized = account.join(acctypes.income,
options_map["account_unrealized_gains"])
unrealized = realization.get_or_create(real_root, account_unrealized)
unrealized.balance.add_inventory(balance_diff)
elif conversion == 'cost':
for real_account in realization.iter_children(real_root):
real_account.balance = real_account.balance.reduce(convert.get_cost)
realization.dump_balances(real_root, dformat, file=sys.stdout)
# Print out net income change.
net_income = inventory.Inventory()
for real_node in realization.iter_children(real_root):
if account_types.is_income_statement_account(real_node.account, acctypes):
net_income.add_inventory(real_node.balance)
print()
print('Net Income: {}'.format(-net_income))
beancount.scripts.doctor.resolve_region_to_entries(entries, filename, region)
Resolve a filename and region to a list of entries.
Source code in beancount/scripts/doctor.py
def resolve_region_to_entries(
entries: List[data.Entries],
filename: str,
region: Tuple[str, int, int]
) -> List[data.Entries]:
"""Resolve a filename and region to a list of entries."""
search_filename, first_lineno, last_lineno = region
if search_filename is None:
search_filename = filename
# Find all the entries in the region. (To be clear, this isn't like the
# 'linked' command, none of the links are followed.)
region_entries = [
entry
for entry in data.filter_txns(entries)
if (entry.meta['filename'] == search_filename and
first_lineno <= entry.meta['lineno'] <= last_lineno)]
return region_entries
beancount.scripts.example
beancount.scripts.example.LiberalDate (ParamType)
beancount.scripts.example.LiberalDate.convert(self, value, param, ctx)
Convert the value to the correct type. This is not called if
the value is None
(the missing value).
This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The param
and ctx
arguments may be None
in certain
situations, such as when converting prompt input.
If the value cannot be converted, call :meth:fail
with a
descriptive message.
:param value: The value to convert.
:param param: The parameter that is using this type to convert
its value. May be None
.
:param ctx: The current context that arrived at this value. May
be None
.
Source code in beancount/scripts/example.py
def convert(self, value, param, ctx):
try:
date_utils.parse_date_liberally(value)
except ValueError:
self.fail("{!r} is not a valid date".format(value), param, ctx)
beancount.scripts.example.check_non_negative(entries, account, currency)
Check that the balance of the given account never goes negative.
Parameters: |
|
---|
Exceptions: |
|
---|
Source code in beancount/scripts/example.py
def check_non_negative(entries, account, currency):
"""Check that the balance of the given account never goes negative.
Args:
entries: A list of directives.
account: An account string, the account to check the balance for.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: if the balance goes negative.
"""
previous_date = None
for txn_posting, balances in postings_for(data.sorted(entries), [account], before=True):
balance = balances[account]
date = txn_posting.txn.date
if date != previous_date:
assert all(pos.units.number >= ZERO for pos in balance.get_positions()), (
"Negative balance: {} at: {}".format(balance, txn_posting.txn.date))
previous_date = date
beancount.scripts.example.compute_trip_dates(date_begin, date_end)
Generate dates at reasonable intervals for trips during the given time period.
Parameters: |
|
---|
Yields: Pairs of dates for the trips within the period.
Source code in beancount/scripts/example.py
def compute_trip_dates(date_begin, date_end):
"""Generate dates at reasonable intervals for trips during the given time period.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of dates for the trips within the period.
"""
# Min and max number of days remaining at home.
days_at_home = (4*30, 13*30)
# Length of trip.
days_trip = (8, 22)
# Number of days to ensure no trip at the beginning and the end.
days_buffer = 21
date_begin += datetime.timedelta(days=days_buffer)
date_end -= datetime.timedelta(days=days_buffer)
date = date_begin
while 1:
duration_at_home = datetime.timedelta(days=random.randint(*days_at_home))
duration_trip = datetime.timedelta(days=random.randint(*days_trip))
date_trip_begin = date + duration_at_home
date_trip_end = date_trip_begin + duration_trip
if date_trip_end >= date_end:
break
yield (date_trip_begin, date_trip_end)
date = date_trip_end
beancount.scripts.example.contextualize_file(contents, employer)
Replace generic strings in the generated file with realistic strings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def contextualize_file(contents, employer):
"""Replace generic strings in the generated file with realistic strings.
Args:
contents: A string, the generic file contents.
Returns:
A string, the contextualized version.
"""
replacements = {
'CC': 'US',
'Bank1': 'BofA',
'Bank1_Institution': 'Bank of America',
'Bank1_Address': '123 America Street, LargeTown, USA',
'Bank1_Phone': '+1.012.345.6789',
'CreditCard1': 'Chase:Slate',
'CreditCard2': 'Amex:BlueCash',
'Employer1': employer,
'Retirement': 'Vanguard',
'Retirement_Institution': 'Vanguard Group',
'Retirement_Address': "P.O. Box 1110, Valley Forge, PA 19482-1110",
'Retirement_Phone': "+1.800.523.1188",
'Investment': 'ETrade',
# Commodities
'CCY': 'USD',
'VACHR': 'VACHR',
'DEFCCY': 'IRAUSD',
'MFUND1': 'VBMPX',
'MFUND2': 'RGAGX',
'STK1': 'ITOT',
'STK2': 'VEA',
'STK3': 'VHT',
'STK4': 'GLD',
}
new_contents = replace(contents, replacements)
return new_contents, replacements
beancount.scripts.example.date_iter(date_begin, date_end)
Generate a sequence of dates.
Parameters: |
|
---|
Yields: Instances of datetime.date.
Source code in beancount/scripts/example.py
def date_iter(date_begin, date_end):
"""Generate a sequence of dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date.
"""
assert date_begin <= date_end
date = date_begin
one_day = datetime.timedelta(days=1)
while date < date_end:
date += one_day
yield date
beancount.scripts.example.date_random_seq(date_begin, date_end, days_min, days_max)
Generate a sequence of dates with some random increase in days.
Parameters: |
|
---|
Yields: Instances of datetime.date.
Source code in beancount/scripts/example.py
def date_random_seq(date_begin, date_end, days_min, days_max):
"""Generate a sequence of dates with some random increase in days.
Args:
date_begin: The start date.
date_end: The end date.
days_min: The minimum number of days to advance on each iteration.
days_max: The maximum number of days to advance on each iteration.
Yields:
Instances of datetime.date.
"""
assert days_min > 0
assert days_min <= days_max
date = date_begin
while date < date_end:
nb_days_forward = random.randint(days_min, days_max)
date += datetime.timedelta(days=nb_days_forward)
if date >= date_end:
break
yield date
beancount.scripts.example.delay_dates(date_iter, delay_days_min, delay_days_max)
Delay the dates from the given iterator by some uniformly drawn number of days.
Parameters: |
|
---|
Yields: datetime.date instances.
Source code in beancount/scripts/example.py
def delay_dates(date_iter, delay_days_min, delay_days_max):
"""Delay the dates from the given iterator by some uniformly drawn number of days.
Args:
date_iter: An iterator of datetime.date instances.
delay_days_min: The minimum amount of advance days for the transaction.
delay_days_max: The maximum amount of advance days for the transaction.
Yields:
datetime.date instances.
"""
dates = list(date_iter)
last_date = dates[-1]
last_date = last_date.date() if isinstance(last_date, datetime.datetime) else last_date
for dtime in dates:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
date += datetime.timedelta(days=random.randint(delay_days_min, delay_days_max))
if date >= last_date:
break
yield date
beancount.scripts.example.generate_balance_checks(entries, account, date_iter)
Generate balance check entries to the given frequency.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_balance_checks(entries, account, date_iter):
"""Generate balance check entries to the given frequency.
Args:
entries: A list of directives that contain all the transactions for the
accounts.
account: The name of the account for which to generate.
date_iter: Iterator of dates. We generate balance checks at these dates.
Returns:
A list of balance check entries.
"""
balance_checks = []
date_iter = iter(date_iter)
next_date = next(date_iter)
with misc_utils.swallow(StopIteration):
for txn_posting, balance in postings_for(entries, [account], before=True):
while txn_posting.txn.date >= next_date:
amount = balance[account].get_currency_units('CCY').number
balance_checks.extend(parse(f"""
{next_date} balance {account} {amount} CCY
"""))
next_date = next(date_iter)
return balance_checks
beancount.scripts.example.generate_banking(entries, date_begin, date_end, amount_initial)
Generate a checking account opening.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_banking(entries, date_begin, date_end, amount_initial):
"""Generate a checking account opening.
Args:
entries: A list of entries which affect this account.
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
amount_initial: A Decimal instance, the amount to initialize the checking
account with.
Returns:
A list of directives.
"""
amount_initial_neg = -amount_initial
new_entries = parse(f"""
{date_begin} open Assets:CC:Bank1
institution: "Bank1_Institution"
address: "Bank1_Address"
phone: "Bank1_Phone"
{date_begin} open Assets:CC:Bank1:Checking CCY
account: "00234-48574897"
;; {date_begin} open Assets:CC:Bank1:Savings CCY
{date_begin} * "Opening Balance for checking account"
Assets:CC:Bank1:Checking {amount_initial} CCY
Equity:Opening-Balances {amount_initial_neg} CCY
""")
date_balance = date_begin + datetime.timedelta(days=1)
account = 'Assets:CC:Bank1:Checking'
for txn_posting, balances in postings_for(data.sorted(entries + new_entries),
[account], before=True):
if txn_posting.txn.date >= date_balance:
break
amount_balance = balances[account].get_currency_units('CCY').number
bal_entries = parse(f"""
{date_balance} balance Assets:CC:Bank1:Checking {amount_balance} CCY
""")
return new_entries + bal_entries
beancount.scripts.example.generate_banking_expenses(date_begin, date_end, account, rent_amount)
Generate expenses paid out of a checking account, typically living expenses.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_banking_expenses(date_begin, date_end, account, rent_amount):
"""Generate expenses paid out of a checking account, typically living expenses.
Args:
date_begin: The start date.
date_end: The end date.
account: The checking account to generate expenses to.
rent_amount: The amount of rent.
Returns:
A list of directives.
"""
fee_expenses = generate_periodic_expenses(
rrule.rrule(rrule.MONTHLY, bymonthday=4, dtstart=date_begin, until=date_end),
"BANK FEES", "Monthly bank fee",
account, 'Expenses:Financial:Fees',
lambda: D('4.00'))
rent_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 2, 5),
"RiverBank Properties", "Paying the rent",
account, 'Expenses:Home:Rent',
lambda: random.normalvariate(float(rent_amount), 0))
electricity_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 7, 8),
"EDISON POWER", "",
account, 'Expenses:Home:Electricity',
lambda: random.normalvariate(65, 0))
internet_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 20, 22),
"Wine-Tarner Cable", "",
account, 'Expenses:Home:Internet',
lambda: random.normalvariate(80, 0.10))
phone_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 17, 19),
"Verizon Wireless", "",
account, 'Expenses:Home:Phone',
lambda: random.normalvariate(60, 10))
return data.sorted(fee_expenses +
rent_expenses +
electricity_expenses +
internet_expenses +
phone_expenses)
beancount.scripts.example.generate_clearing_entries(date_iter, payee, narration, entries, account_clear, account_from)
Generate entries to clear the value of an account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_clearing_entries(date_iter,
payee, narration,
entries, account_clear, account_from):
"""Generate entries to clear the value of an account.
Args:
date_iter: An iterator of datetime.date instances.
payee: A string, the payee name to use on the transactions.
narration: A string, the narration to use on the transactions.
entries: A list of entries.
account_clear: The account to clear.
account_from: The source account to clear 'account_clear' from.
Returns:
A list of directives.
"""
new_entries = []
# The next date we're looking for.
date_iter = iter(date_iter)
next_date = next(date_iter, None)
if not next_date:
return new_entries
# Iterate over all the postings of the account to clear.
for txn_posting, balances in postings_for(entries, [account_clear]):
balance_clear = balances[account_clear]
# Check if we need to clear.
if next_date <= txn_posting.txn.date:
pos_amount = balance_clear.get_currency_units('CCY')
neg_amount = -pos_amount
new_entries.extend(parse(f"""
{next_date} * "{payee}" "{narration}"
{account_clear} {neg_amount.number:.2f} CCY
{account_from} {pos_amount.number:.2f} CCY
"""))
balance_clear.add_amount(neg_amount)
# Advance to the next date we're looking for.
next_date = next(date_iter, None)
if not next_date:
break
return new_entries
beancount.scripts.example.generate_commodity_entries(date_birth)
Create a list of Commodity entries for all the currencies we're using.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_commodity_entries(date_birth):
"""Create a list of Commodity entries for all the currencies we're using.
Args:
date_birth: A datetime.date instance, the date of birth of the user.
Returns:
A list of Commodity entries for all the commodities in use.
"""
return parse(f"""
1792-01-01 commodity USD
name: "US Dollar"
export: "CASH"
{date_birth} commodity VACHR
name: "Employer Vacation Hours"
export: "IGNORE"
{date_birth} commodity IRAUSD
name: "US 401k and IRA Contributions"
export: "IGNORE"
2009-05-01 commodity RGAGX
name: "American Funds The Growth Fund of America Class R-6"
export: "MUTF:RGAGX"
price: "USD:google/MUTF:RGAGX"
1995-09-18 commodity VBMPX
name: "Vanguard Total Bond Market Index Fund Institutional Plus Shares"
export: "MUTF:VBMPX"
price: "USD:google/MUTF:VBMPX"
2004-01-20 commodity ITOT
name: "iShares Core S&P Total U.S. Stock Market ETF"
export: "NYSEARCA:ITOT"
price: "USD:google/NYSEARCA:ITOT"
2007-07-20 commodity VEA
name: "Vanguard FTSE Developed Markets ETF"
export: "NYSEARCA:VEA"
price: "USD:google/NYSEARCA:VEA"
2004-01-26 commodity VHT
name: "Vanguard Health Care ETF"
export: "NYSEARCA:VHT"
price: "USD:google/NYSEARCA:VHT"
2004-11-01 commodity GLD
name: "SPDR Gold Trust (ETF)"
export: "NYSEARCA:GLD"
price: "USD:google/NYSEARCA:GLD"
1900-01-01 commodity VMMXX
export: "MUTF:VMMXX (MONEY:USD)"
""")
beancount.scripts.example.generate_employment_income(employer_name, employer_address, annual_salary, account_deposit, account_retirement, date_begin, date_end)
Generate bi-weekly entries for payroll salary income.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_employment_income(employer_name,
employer_address,
annual_salary,
account_deposit,
account_retirement,
date_begin,
date_end):
"""Generate bi-weekly entries for payroll salary income.
Args:
employer_name: A string, the human-readable name of the employer.
employer_address: A string, the address of the employer.
annual_salary: A Decimal, the annual salary of the employee.
account_deposit: An account string, the account to deposit the salary to.
account_retirement: An account string, the account to deposit retirement
contributions to.
date_begin: The start date.
date_end: The end date.
Returns:
A list of directives, including open directives for the account.
"""
preamble = parse(f"""
{date_begin} event "employer" "{employer_name}, {employer_address}"
{date_begin} open Income:CC:Employer1:Salary CCY
;{date_begin} open Income:CC:Employer1:AnnualBonus CCY
{date_begin} open Income:CC:Employer1:GroupTermLife CCY
{date_begin} open Income:CC:Employer1:Vacation VACHR
{date_begin} open Assets:CC:Employer1:Vacation VACHR
{date_begin} open Expenses:Vacation VACHR
{date_begin} open Expenses:Health:Life:GroupTermLife
{date_begin} open Expenses:Health:Medical:Insurance
{date_begin} open Expenses:Health:Dental:Insurance
{date_begin} open Expenses:Health:Vision:Insurance
;{date_begin} open Expenses:Vacation:Employer
""")
date_prev = None
contrib_retirement = ZERO
contrib_socsec = ZERO
biweekly_pay = annual_salary / 26
gross = biweekly_pay
medicare = gross * D('0.0231')
federal = gross * D('0.2303')
state = gross * D('0.0791')
city = gross * D('0.0379')
sdi = D('1.12')
lifeinsurance = D('24.32')
dental = D('2.90')
medical = D('27.38')
vision = D('42.30')
fixed = (medicare + federal + state + city + sdi +
dental + medical + vision)
# Calculate vacation hours per-pay.
with decimal.localcontext() as ctx:
ctx.prec = 4
vacation_hrs = (ANNUAL_VACATION_DAYS * D('8')) / D('26')
transactions = []
for dtime in misc_utils.skipiter(
rrule.rrule(rrule.WEEKLY, byweekday=rrule.TH,
dtstart=date_begin, until=date_end), 2):
date = dtime.date()
year = date.year
if not date_prev or date_prev.year != date.year:
contrib_retirement = RETIREMENT_LIMITS.get(date.year, RETIREMENT_LIMITS[None])
contrib_socsec = D('7000')
date_prev = date
retirement_uncapped = math.ceil((gross * D('0.25')) / 100) * 100
retirement = min(contrib_retirement, retirement_uncapped)
contrib_retirement -= retirement
socsec_uncapped = gross * D('0.0610')
socsec = min(contrib_socsec, socsec_uncapped)
contrib_socsec -= socsec
with decimal.localcontext() as ctx:
ctx.prec = 6
deposit = (gross - retirement - fixed - socsec)
retirement_neg = -retirement
gross_neg = -gross
lifeinsurance_neg = -lifeinsurance
vacation_hrs_neg = -vacation_hrs
transactions.extend(parse(f"""
{date} * "{employer_name}" "Payroll"
{account_deposit} {deposit:.2f} CCY
""" + ("" if retirement == ZERO else f"""\
{account_retirement} {retirement:.2f} CCY
Assets:CC:Federal:PreTax401k {retirement_neg:.2f} DEFCCY
Expenses:Taxes:Y{year}:CC:Federal:PreTax401k {retirement:.2f} DEFCCY
""") + f"""\
Income:CC:Employer1:Salary {gross_neg:.2f} CCY
Income:CC:Employer1:GroupTermLife {lifeinsurance_neg:.2f} CCY
Expenses:Health:Life:GroupTermLife {lifeinsurance:.2f} CCY
Expenses:Health:Dental:Insurance {dental} CCY
Expenses:Health:Medical:Insurance {medical} CCY
Expenses:Health:Vision:Insurance {vision} CCY
Expenses:Taxes:Y{year}:CC:Medicare {medicare:.2f} CCY
Expenses:Taxes:Y{year}:CC:Federal {federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {state:.2f} CCY
Expenses:Taxes:Y{year}:CC:CityNYC {city:.2f} CCY
Expenses:Taxes:Y{year}:CC:SDI {sdi:.2f} CCY
Expenses:Taxes:Y{year}:CC:SocSec {socsec:.2f} CCY
Assets:CC:Employer1:Vacation {vacation_hrs:.2f} VACHR
Income:CC:Employer1:Vacation {vacation_hrs_neg:.2f} VACHR
"""))
return preamble + transactions
beancount.scripts.example.generate_expense_accounts(date_birth)
Generate directives for expense accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_expense_accounts(date_birth):
"""Generate directives for expense accounts.
Args:
date_birth: Birth date of the character.
Returns:
A list of directives.
"""
return parse(f"""
{date_birth} open Expenses:Food:Groceries
{date_birth} open Expenses:Food:Restaurant
{date_birth} open Expenses:Food:Coffee
{date_birth} open Expenses:Food:Alcohol
{date_birth} open Expenses:Transport:Tram
{date_birth} open Expenses:Home:Rent
{date_birth} open Expenses:Home:Electricity
{date_birth} open Expenses:Home:Internet
{date_birth} open Expenses:Home:Phone
{date_birth} open Expenses:Financial:Fees
{date_birth} open Expenses:Financial:Commissions
""")
beancount.scripts.example.generate_open_entries(date, accounts, currency=None)
Generate a list of Open entries for the given accounts:
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_open_entries(date, accounts, currency=None):
"""Generate a list of Open entries for the given accounts:
Args:
date: A datetime.date instance for the open entries.
accounts: A list of account strings.
currency: An optional currency constraint.
Returns:
A list of Open directives.
"""
assert isinstance(accounts, (list, tuple))
return parse(''.join(
'{date} open {account} {currency}\n'.format(date=date,
account=account,
currency=currency or '')
for account in accounts))
beancount.scripts.example.generate_outgoing_transfers(entries, account, account_out, transfer_minimum, transfer_threshold, transfer_increment)
Generate transfers of accumulated funds out of an account.
This monitors the balance of an account and when it is beyond a threshold, generate out transfers form that account to another account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_outgoing_transfers(entries,
account,
account_out,
transfer_minimum,
transfer_threshold,
transfer_increment):
"""Generate transfers of accumulated funds out of an account.
This monitors the balance of an account and when it is beyond a threshold,
generate out transfers form that account to another account.
Args:
entries: A list of existing entries that affect this account so far.
The generated entries will also affect this account.
account: An account string, the account to monitor.
account_out: An account string, the savings account to make transfers to.
transfer_minimum: The minimum amount of funds to always leave in this account
after a transfer.
transfer_threshold: The minimum amount of funds to be able to transfer out without
breaking the minimum.
transfer_increment: A Decimal, the increment to round transfers to.
Returns:
A list of new directives, the transfers to add to the given account.
"""
last_date = entries[-1].date
# Reverse the balance amounts taking into account the minimum balance for
# all time in the future.
amounts = [(balances[account].get_currency_units('CCY').number, txn_posting)
for txn_posting, balances in postings_for(entries, [account])]
reversed_amounts = []
last_amount, _ = amounts[-1]
for current_amount, _ in reversed(amounts):
if current_amount < last_amount:
reversed_amounts.append(current_amount)
last_amount = current_amount
else:
reversed_amounts.append(last_amount)
capped_amounts = reversed(reversed_amounts)
# Create transfers outward where the future allows it.
new_entries = []
offset_amount = ZERO
for current_amount, (_, txn_posting) in zip(capped_amounts, amounts):
if txn_posting.txn.date >= last_date:
break
adjusted_amount = current_amount - offset_amount
if adjusted_amount > (transfer_minimum + transfer_threshold):
amount_transfer = round_to(adjusted_amount - transfer_minimum,
transfer_increment)
date = txn_posting.txn.date + datetime.timedelta(days=1)
amount_transfer_neg = -amount_transfer
new_entries.extend(parse(f"""
{date} * "Transfering accumulated savings to other account"
{account} {amount_transfer_neg:2f} CCY
{account_out} {amount_transfer:2f} CCY
"""))
offset_amount += amount_transfer
return new_entries
beancount.scripts.example.generate_periodic_expenses(date_iter, payee, narration, account_from, account_to, amount_generator)
Generate periodic expense transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_periodic_expenses(date_iter,
payee, narration,
account_from, account_to,
amount_generator):
"""Generate periodic expense transactions.
Args:
date_iter: An iterator for dates or datetimes.
payee: A string, the payee name to use on the transactions, or a set of such strings
to randomly choose from
narration: A string, the narration to use on the transactions.
account_from: An account string the debited account.
account_to: An account string the credited account.
amount_generator: A callable object to generate variates.
Returns:
A list of directives.
"""
new_entries = []
for dtime in date_iter:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
amount = D(amount_generator())
txn_payee = (payee
if isinstance(payee, str)
else random.choice(payee))
txn_narration = (narration
if isinstance(narration, str)
else random.choice(narration))
amount_neg = -amount
new_entries.extend(parse(f"""
{date} * "{txn_payee}" "{txn_narration}"
{account_from} {amount_neg:.2f} CCY
{account_to} {amount:.2f} CCY
"""))
return new_entries
beancount.scripts.example.generate_prices(date_begin, date_end, currencies, cost_currency)
Generate weekly or monthly price entries for the given currencies.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_prices(date_begin, date_end, currencies, cost_currency):
"""Generate weekly or monthly price entries for the given currencies.
Args:
date_begin: The start date.
date_end: The end date.
currencies: A list of currency strings to generate prices for.
cost_currency: A string, the cost currency.
Returns:
A list of Price directives.
"""
digits = D('0.01')
entries = []
counter = itertools.count()
for currency in currencies:
start_price = random.uniform(30, 200)
growth = random.uniform(0.02, 0.13) # %/year
mu = growth * (7 / 365)
sigma = random.uniform(0.005, 0.02) # Vol
for dtime, price_float in zip(rrule.rrule(rrule.WEEKLY, byweekday=rrule.FR,
dtstart=date_begin, until=date_end),
price_series(start_price, mu, sigma)):
price = D(price_float).quantize(digits)
meta = data.new_metadata(generate_prices.__name__, next(counter))
entry = data.Price(meta, dtime.date(), currency,
amount.Amount(price, cost_currency))
entries.append(entry)
return entries
beancount.scripts.example.generate_regular_credit_expenses(date_birth, date_begin, date_end, account_credit, account_checking)
Generate expenses paid out of a credit card account, including payments to the credit card.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_regular_credit_expenses(date_birth, date_begin, date_end,
account_credit,
account_checking):
"""Generate expenses paid out of a credit card account, including payments to the
credit card.
Args:
date_birth: The user's birth date.
date_begin: The start date.
date_end: The end date.
account_credit: The credit card account to generate expenses against.
account_checking: The checking account to generate payments from.
Returns:
A list of directives.
"""
restaurant_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 1, 5),
RESTAURANT_NAMES, RESTAURANT_NARRATIONS,
account_credit, 'Expenses:Food:Restaurant',
lambda: min(random.lognormvariate(math.log(30), math.log(1.5)),
random.randint(200, 220)))
groceries_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 5, 20),
GROCERIES_NAMES, "Buying groceries",
account_credit, 'Expenses:Food:Groceries',
lambda: min(random.lognormvariate(math.log(80), math.log(1.3)),
random.randint(250, 300)))
subway_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 27, 33),
"Metro Transport Authority", "Tram tickets",
account_credit, 'Expenses:Transport:Tram',
lambda: D('120.00'))
credit_expenses = data.sorted(restaurant_expenses +
groceries_expenses +
subway_expenses)
# Entries to open accounts.
credit_preamble = generate_open_entries(date_birth, [account_credit], 'CCY')
return data.sorted(credit_preamble + credit_expenses)
beancount.scripts.example.generate_retirement_employer_match(entries, account_invest, account_income)
Generate employer matching contributions into a retirement account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_retirement_employer_match(entries, account_invest, account_income):
"""Generate employer matching contributions into a retirement account.
Args:
entries: A list of directives that cover the retirement account.
account_invest: The name of the retirement cash account.
account_income: The name of the income account.
Returns:
A list of new entries generated for employer contributions.
"""
match_frac = D('0.50')
new_entries = parse(f"""
{entries[0].date} open {account_income} CCY
""")
for txn_posting, balances in postings_for(entries, [account_invest]):
amount = txn_posting.posting.units.number * match_frac
amount_neg = -amount
date = txn_posting.txn.date + ONE_DAY
new_entries.extend(parse(f"""
{date} * "Employer match for contribution"
{account_invest} {amount:.2f} CCY
{account_income} {amount_neg:.2f} CCY
"""))
return new_entries
beancount.scripts.example.generate_retirement_investments(entries, account, commodities_items, price_map)
Invest money deposited to the given retirement account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_retirement_investments(entries, account, commodities_items, price_map):
"""Invest money deposited to the given retirement account.
Args:
entries: A list of directives
account: The root account for all retirement investment sub-accounts.
commodities_items: A list of (commodity, fraction to be invested in) items.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
Returns:
A list of new directives for the given investments. This also generates account
opening directives for the desired investment commodities.
"""
open_entries = []
account_cash = join(account, 'Cash')
date_origin = entries[0].date
open_entries.extend(parse(f"""
{date_origin} open {account} CCY
institution: "Retirement_Institution"
address: "Retirement_Address"
phone: "Retirement_Phone"
{date_origin} open {account_cash} CCY
number: "882882"
"""))
for currency, _ in commodities_items:
open_entries.extend(parse(f"""
{date_origin} open {account}:{currency} {currency}
number: "882882"
"""))
new_entries = []
for txn_posting, balances in postings_for(entries, [account_cash]):
balance = balances[account_cash]
amount_to_invest = balance.get_currency_units('CCY').number
# Find the date the following Monday, the date to invest.
txn_date = txn_posting.txn.date
while txn_date.weekday() != calendar.MONDAY:
txn_date += ONE_DAY
amount_invested = ZERO
for commodity, fraction in commodities_items:
amount_fraction = amount_to_invest * D(fraction)
# Find the price at that date.
_, price = prices.get_price(price_map, (commodity, 'CCY'), txn_date)
units = (amount_fraction / price).quantize(D('0.001'))
amount_cash = (units * price).quantize(D('0.01'))
amount_cash_neg = -amount_cash
new_entries.extend(parse(f"""
{txn_date} * "Investing {fraction:.0%} of cash in {commodity}"
{account}:{commodity} {units:.3f} {commodity} {{{price:.2f} CCY}}
{account}:Cash {amount_cash_neg:.2f} CCY
"""))
balance.add_amount(amount.Amount(-amount_cash, 'CCY'))
return data.sorted(open_entries + new_entries)
beancount.scripts.example.generate_tax_accounts(year, date_max)
Generate accounts and contribution directives for a particular tax year.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_tax_accounts(year, date_max):
"""Generate accounts and contribution directives for a particular tax year.
Args:
year: An integer, the year we're to generate this for.
date_max: The maximum date to produce an entry for.
Returns:
A list of directives.
"""
date_year = datetime.date(year, 1, 1)
date_filing = (datetime.date(year + 1, 3, 20) +
datetime.timedelta(days=random.randint(0, 5)))
date_federal = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
date_state = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
quantum = D('0.01')
amount_federal = D(max(random.normalvariate(500, 120), 12)).quantize(quantum)
amount_federal_neg = -amount_federal
amount_state = D(max(random.normalvariate(300, 100), 10)).quantize(quantum)
amount_state_neg = -amount_state
amount_payable = -(amount_federal + amount_state)
amount_limit = RETIREMENT_LIMITS.get(year, RETIREMENT_LIMITS[None])
amount_limit_neg = -amount_limit
entries = parse(f"""
;; Open tax accounts for that year.
{date_year} open Expenses:Taxes:Y{year}:CC:Federal:PreTax401k DEFCCY
{date_year} open Expenses:Taxes:Y{year}:CC:Medicare CCY
{date_year} open Expenses:Taxes:Y{year}:CC:Federal CCY
{date_year} open Expenses:Taxes:Y{year}:CC:CityNYC CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SDI CCY
{date_year} open Expenses:Taxes:Y{year}:CC:State CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SocSec CCY
;; Check that the tax amounts have been fully used.
{date_year} balance Assets:CC:Federal:PreTax401k 0 DEFCCY
{date_year} * "Allowed contributions for one year"
Income:CC:Federal:PreTax401k {amount_limit_neg} DEFCCY
Assets:CC:Federal:PreTax401k {amount_limit} DEFCCY
{date_filing} * "Filing taxes for {year}"
Expenses:Taxes:Y{year}:CC:Federal {amount_federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {amount_state:.2f} CCY
Liabilities:AccountsPayable {amount_payable:.2f} CCY
{date_federal} * "FEDERAL TAXPYMT"
Assets:CC:Bank1:Checking {amount_federal_neg:.2f} CCY
Liabilities:AccountsPayable {amount_federal:.2f} CCY
{date_state} * "STATE TAX & FINANC PYMT"
Assets:CC:Bank1:Checking {amount_state_neg:.2f} CCY
Liabilities:AccountsPayable {amount_state:.2f} CCY
""")
return [entry for entry in entries if entry.date < date_max]
beancount.scripts.example.generate_tax_preamble(date_birth)
Generate tax declarations not specific to any particular year.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_tax_preamble(date_birth):
"""Generate tax declarations not specific to any particular year.
Args:
date_birth: A date instance, the birth date of the character.
Returns:
A list of directives.
"""
return parse(f"""
;; Tax accounts not specific to a year.
{date_birth} open Income:CC:Federal:PreTax401k DEFCCY
{date_birth} open Assets:CC:Federal:PreTax401k DEFCCY
""")
beancount.scripts.example.generate_taxable_investment(date_begin, date_end, entries, price_map, stocks)
Generate opening directives and transactions for an investment account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_taxable_investment(date_begin, date_end, entries, price_map, stocks):
"""Generate opening directives and transactions for an investment account.
Args:
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
entries: A list of entries that contains at least the transfers to the investment
account's cash account.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
stocks: A list of strings, the list of commodities to invest in.
Returns:
A list of directives.
"""
account = 'Assets:CC:Investment'
income = 'Income:CC:Investment'
account_cash = join(account, 'Cash')
account_gains = '{income}:PnL'.format(income=income)
dividends = 'Dividend'
accounts_stocks = ['Assets:CC:Investment:{}'.format(commodity)
for commodity in stocks]
open_entries = parse(f"""
{date_begin} open {account}:Cash CCY
{date_begin} open {account_gains} CCY
""")
for stock in stocks:
open_entries.extend(parse(f"""
{date_begin} open {account}:{stock} {stock}
{date_begin} open {income}:{stock}:{dividends} CCY
"""))
# Figure out dates at which dividends should be distributed, near the end of
# each quarter.
days_to = datetime.timedelta(days=3*90-10)
dividend_dates = []
for quarter_begin in iter_quarters(date_begin, date_end):
end_of_quarter = quarter_begin + days_to
if not (date_begin < end_of_quarter < date_end):
continue
dividend_dates.append(end_of_quarter)
# Iterate over all the dates, but merging in the postings for the cash
# account.
min_amount = D('1000.00')
round_amount = D('100.00')
commission = D('8.95')
round_units = D('1')
frac_invest = D('1.00')
frac_dividend = D('0.004')
p_daily_buy = 1./15 # days
p_daily_sell = 1./90 # days
stocks_inventory = inventory.Inventory()
new_entries = []
dividend_date_iter = iter(dividend_dates)
next_dividend_date = next(dividend_date_iter, None)
for date, balances in iter_dates_with_balance(date_begin, date_end,
entries, [account_cash]):
# Check if we should insert a dividend. Note that we could not factor
# this out because we want to explicitly reinvest the cash dividends and
# we also want the dividends to be proportional to the amount of
# invested stock, so one feeds on the other and vice-versa.
if next_dividend_date and date > next_dividend_date:
# Compute the total balances for the stock accounts in order to
# create a realistic dividend.
total = inventory.Inventory()
for account_stock in accounts_stocks:
total.add_inventory(balances[account_stock])
# Create an entry offering dividends of 1% of the portfolio.
portfolio_cost = total.reduce(convert.get_cost).get_currency_units('CCY').number
amount_cash = (frac_dividend * portfolio_cost).quantize(D('0.01'))
amount_cash_neg = -amount_cash
stock = random.choice(stocks)
cash_dividend = parse(f"""
{next_dividend_date} * "Dividends on portfolio"
{account}:Cash {amount_cash:.2f} CCY
{income}:{stock}:{dividends} {amount_cash_neg:.2f} CCY
""")[0]
new_entries.append(cash_dividend)
# Advance the next dividend date.
next_dividend_date = next(dividend_date_iter, None)
# If the balance is high, buy with high probability.
balance = balances[account_cash]
total_cash = balance.get_currency_units('CCY').number
assert total_cash >= ZERO, ('Cash balance is negative: {}'.format(total_cash))
invest_cash = total_cash * frac_invest - commission
if invest_cash > min_amount:
if random.random() < p_daily_buy:
commodities = random.sample(stocks, random.randint(1, len(stocks)))
lot_amount = round_to(invest_cash / len(commodities), round_amount)
invested_amount = ZERO
for stock in commodities:
# Find the price at that date.
_, price = prices.get_price(price_map, (stock, 'CCY'), date)
units = round_to((lot_amount / price), round_units)
if units <= ZERO:
continue
amount_cash = -(units * price + commission)
# logging.info('Buying %s %s @ %s CCY = %s CCY',
# units, stock, price, units * price)
buy = parse(f"""
{date} * "Buy shares of {stock}"
{account}:Cash {amount_cash:.2f} CCY
{account}:{stock} {units:.0f} {stock} {{{price:.2f} CCY}}
Expenses:Financial:Commissions {commission:.2f} CCY
""")[0]
new_entries.append(buy)
account_stock = ':'.join([account, stock])
balances[account_cash].add_position(buy.postings[0])
balances[account_stock].add_position(buy.postings[1])
stocks_inventory.add_position(buy.postings[1])
# Don't sell on days you buy.
continue
# Otherwise, sell with low probability.
if not stocks_inventory.is_empty() and random.random() < p_daily_sell:
# Choose the lot with the highest gain or highest loss.
gains = []
for position in stocks_inventory.get_positions():
base_quote = (position.units.currency, position.cost.currency)
_, price = prices.get_price(price_map, base_quote, date)
if price == position.cost.number:
continue # Skip lots without movement.
market_value = position.units.number * price
book_value = convert.get_cost(position).number
gain = market_value - book_value
gains.append((gain, market_value, price, position))
if not gains:
continue
# Sell either biggest winner or biggest loser.
biggest = bool(random.random() < 0.5)
lot_tuple = sorted(gains)[0 if biggest else -1]
gain, market_value, price, sell_position = lot_tuple
#logging.info('Selling {} for {}'.format(sell_position, market_value))
sell_position = -sell_position
stock = sell_position.units.currency
amount_cash = market_value - commission
amount_gain = -gain
sell = parse(f"""
{date} * "Sell shares of {stock}"
{account}:{stock} {sell_position} @ {price:.2f} CCY
{account}:Cash {amount_cash:.2f} CCY
Expenses:Financial:Commissions {commission:.2f} CCY
{account_gains} {amount_gain:.2f} CCY
""")[0]
new_entries.append(sell)
balances[account_cash].add_position(sell.postings[1])
stocks_inventory.add_position(sell.postings[0])
continue
return open_entries + new_entries
beancount.scripts.example.generate_trip_entries(date_begin, date_end, tag, config, trip_city, home_city, account_credit)
Generate more dense expenses for a trip.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_trip_entries(date_begin, date_end,
tag, config,
trip_city, home_city,
account_credit):
"""Generate more dense expenses for a trip.
Args:
date_begin: A datetime.date instance, the beginning of the trip.
date_end: A datetime.date instance, the end of the trip.
tag: A string, the name of the tag.
config: A list of (payee name, account name, (mu, 3sigma)), where
mu is the mean of the prices to generate and 3sigma is 3 times
the standard deviation.
trip_city: A string, the capitalized name of the destination city.
home_city: A string, the name of the home city.
account_credit: A string, the name of the credit card account to pay
the expenses from.
Returns:
A list of entries for the trip, all tagged with the given tag.
"""
p_day_generate = 0.3
new_entries = []
for date in date_iter(date_begin, date_end):
for payee, account_expense, (mu, sigma3) in config:
if random.random() < p_day_generate:
amount = random.normalvariate(mu, sigma3 / 3.)
amount_neg = -amount
new_entries.extend(parse(f"""
{date} * "{payee}" "" #{tag}
{account_credit} {amount_neg:.2f} CCY
{account_expense} {amount:.2f} CCY
"""))
# Consume the vacation days.
vacation_hrs = (date_end - date_begin).days * 8 # hrs/day
new_entries.extend(parse(f"""
{date_end} * "Consume vacation days"
Assets:CC:Employer1:Vacation -{vacation_hrs:.2f} VACHR
Expenses:Vacation {vacation_hrs:.2f} VACHR
"""))
# Generate events for the trip.
new_entries.extend(parse(f"""
{date_begin} event "location" "{trip_city}"
{date_end} event "location" "{home_city}"
"""))
return new_entries
beancount.scripts.example.get_minimum_balance(entries, account, currency)
Compute the minimum balance of the given account according to the entries history.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def get_minimum_balance(entries, account, currency):
"""Compute the minimum balance of the given account according to the entries history.
Args:
entries: A list of directives.
account: An account string.
currency: A currency string, for which we want to compute the minimum.
Returns:
A Decimal number, the minimum amount throughout the history of this account.
"""
min_amount = ZERO
for _, balances in postings_for(entries, [account]):
balance = balances[account]
current = balance.get_currency_units(currency).number
if current < min_amount:
min_amount = current
return min_amount
beancount.scripts.example.iter_dates_with_balance(date_begin, date_end, entries, accounts)
Iterate over dates, including the balances of the postings iterator.
Parameters: |
|
---|
Yields: Pairs of (data, balances) objects, with date: A datetime.date instance balances: An Inventory object, representing the current balance. You can modify the inventory object to feed back changes in the balance.
Source code in beancount/scripts/example.py
def iter_dates_with_balance(date_begin, date_end, entries, accounts):
"""Iterate over dates, including the balances of the postings iterator.
Args:
postings_iter: An iterator of postings as per postings_for().
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of (data, balances) objects, with
date: A datetime.date instance
balances: An Inventory object, representing the current balance.
You can modify the inventory object to feed back changes in the
balance.
"""
balances = collections.defaultdict(inventory.Inventory)
merged_txn_postings = iter(merge_postings(entries, accounts))
txn_posting = next(merged_txn_postings, None)
for date in date_iter(date_begin, date_end):
while txn_posting and txn_posting.txn.date == date:
posting = txn_posting.posting
balances[posting.account].add_position(posting)
txn_posting = next(merged_txn_postings, None)
yield date, balances
beancount.scripts.example.iter_quarters(date_begin, date_end)
Iterate over all quarters between begin and end dates.
Parameters: |
|
---|
Yields: Instances of datetime.date at the beginning of the quarters. This will include the quarter of the beginning date and of the end date.
Source code in beancount/scripts/example.py
def iter_quarters(date_begin, date_end):
"""Iterate over all quarters between begin and end dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date at the beginning of the quarters. This will
include the quarter of the beginning date and of the end date.
"""
quarter = (date_begin.year, (date_begin.month-1)//3)
quarter_last = (date_end.year, (date_end.month-1)//3)
assert quarter <= quarter_last
while True:
year, trimester = quarter
yield datetime.date(year, trimester*3 + 1, 1)
if quarter == quarter_last:
break
trimester = (trimester + 1) % 4
if trimester == 0:
year += 1
quarter = (year, trimester)
beancount.scripts.example.merge_postings(entries, accounts)
Merge all the postings from the given account names.
Parameters: |
|
---|
Yields: A list of TxnPosting's for all the accounts, in sorted order.
Source code in beancount/scripts/example.py
def merge_postings(entries, accounts):
"""Merge all the postings from the given account names.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
Yields:
A list of TxnPosting's for all the accounts, in sorted order.
"""
real_root = realization.realize(entries)
merged_postings = []
for account in accounts:
real_account = realization.get(real_root, account)
if real_account is None:
continue
merged_postings.extend(txn_posting
for txn_posting in real_account.txn_postings
if isinstance(txn_posting, data.TxnPosting))
merged_postings.sort(key=lambda txn_posting: txn_posting.txn.date)
return merged_postings
beancount.scripts.example.parse(input_string)
Parse some input string and assert no errors.
This parse function does not just create the object, it also triggers local interpolation to fill in the missing amounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def parse(input_string):
"""Parse some input string and assert no errors.
This parse function does not just create the object, it also triggers local
interpolation to fill in the missing amounts.
Args:
input_string: Beancount input text.
Returns:
A list of directive objects.
"""
entries, errors, options_map = parser.parse_string(textwrap.dedent(input_string))
if errors:
printer.print_errors(errors, file=sys.stderr)
raise ValueError("Parsed text has errors")
# Interpolation.
entries, unused_balance_errors = booking.book(entries, options_map)
return data.sorted(entries)
beancount.scripts.example.postings_for(entries, accounts, before=False)
Realize the entries and get the list of postings for the given accounts.
All the non-Posting directives are already filtered out.
Parameters: |
|
---|
Yields: Tuples of: posting: An instance of TxnPosting balances: A dict of Inventory balances for the given accounts after applying the posting. These inventory objects can be mutated to adjust the balance due to generated transactions to be applied later.
Source code in beancount/scripts/example.py
def postings_for(entries, accounts, before=False):
"""Realize the entries and get the list of postings for the given accounts.
All the non-Posting directives are already filtered out.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
before: A boolean, if true, yield the balance before the position is applied.
The default is to yield the balance after applying the position.
Yields:
Tuples of:
posting: An instance of TxnPosting
balances: A dict of Inventory balances for the given accounts _after_
applying the posting. These inventory objects can be mutated to adjust
the balance due to generated transactions to be applied later.
"""
assert isinstance(accounts, list)
merged_txn_postings = merge_postings(entries, accounts)
balances = collections.defaultdict(inventory.Inventory)
for txn_posting in merged_txn_postings:
if before:
yield txn_posting, balances
posting = txn_posting.posting
balances[posting.account].add_position(posting)
if not before:
yield txn_posting, balances
beancount.scripts.example.price_series(start, mu, sigma)
Generate a price series based on a simple stochastic model.
Parameters: |
|
---|
Yields: Floats, at each step.
Source code in beancount/scripts/example.py
def price_series(start, mu, sigma):
"""Generate a price series based on a simple stochastic model.
Args:
start: The beginning value.
mu: The per-step drift, in units of value.
sigma: Volatility of the changes.
Yields:
Floats, at each step.
"""
value = start
while 1:
yield value
value += random.normalvariate(mu, sigma) * value
beancount.scripts.example.replace(string, replacements, strip=False)
Apply word-boundaried regular expression replacements to an indented string.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def replace(string, replacements, strip=False):
"""Apply word-boundaried regular expression replacements to an indented string.
Args:
string: Some input template string.
replacements: A dict of regexp to replacement value.
strip: A boolean, true if we should strip the input.
Returns:
The input string with the replacements applied to it, with the indentation removed.
"""
output = textwrap.dedent(string)
if strip:
output = output.strip()
for from_, to_ in replacements.items():
if not isinstance(to_, str) and not callable(to_):
to_ = str(to_)
output = re.sub(r'\b{}\b'.format(from_), to_, output)
return output
beancount.scripts.example.validate_output(contents, positive_accounts, currency)
Check that the output file validates.
Parameters: |
|
---|
Exceptions: |
|
---|
Source code in beancount/scripts/example.py
def validate_output(contents, positive_accounts, currency):
"""Check that the output file validates.
Args:
contents: A string, the output file.
positive_accounts: A list of strings, account names to check for
non-negative balances.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: If the output does not validate.
"""
loaded_entries, _, _ = loader.load_string(
contents,
log_errors=sys.stderr,
extra_validations=validation.HARDCORE_VALIDATIONS)
# Sanity checks: Check that the checking balance never goes below zero.
for account in positive_accounts:
check_non_negative(loaded_entries, account, currency)
beancount.scripts.example.write_example_file(date_birth, date_begin, date_end, reformat, file)
Generate the example file.
Parameters: |
|
---|
Source code in beancount/scripts/example.py
def write_example_file(date_birth, date_begin, date_end, reformat, file):
"""Generate the example file.
Args:
date_birth: A datetime.date instance, the birth date of our character.
date_begin: A datetime.date instance, the beginning date at which to generate
transactions.
date_end: A datetime.date instance, the end date at which to generate
transactions.
reformat: A boolean, true if we should apply global reformatting to this file.
file: A file object, where to write out the output.
"""
# The following code entirely writes out the output to generic names, such
# as "Employer1", "Bank1", and "CCY" (for principal currency). Those names
# are purposely chosen to be unique, and only near the very end do we make
# renamings to more specific and realistic names.
# Name of the checking account.
account_opening = 'Equity:Opening-Balances'
account_payable = 'Liabilities:AccountsPayable'
account_checking = 'Assets:CC:Bank1:Checking'
account_credit = 'Liabilities:CC:CreditCard1'
account_retirement = 'Assets:CC:Retirement'
account_investing = 'Assets:CC:Investment:Cash'
# Commodities.
commodity_entries = generate_commodity_entries(date_birth)
# Estimate the rent.
rent_amount = round_to(ANNUAL_SALARY / RENT_DIVISOR, RENT_INCREMENT)
# Get a random employer.
employer_name, employer_address = random.choice(EMPLOYERS)
logging.info("Generating Salary Employment Income")
income_entries = generate_employment_income(employer_name, employer_address,
ANNUAL_SALARY,
account_checking,
join(account_retirement, 'Cash'),
date_begin, date_end)
logging.info("Generating Expenses from Banking Accounts")
banking_expenses = generate_banking_expenses(date_begin, date_end,
account_checking, rent_amount)
logging.info("Generating Regular Expenses via Credit Card")
credit_regular_entries = generate_regular_credit_expenses(
date_birth, date_begin, date_end, account_credit, account_checking)
logging.info("Generating Credit Card Expenses for Trips")
trip_entries = []
destinations = sorted(TRIP_DESTINATIONS.items())
destinations.extend(destinations)
random.shuffle(destinations)
for (date_trip_begin, date_trip_end), (destination_name, config) in zip(
compute_trip_dates(date_begin, date_end), destinations):
# Compute a suitable tag.
tag = 'trip-{}-{}'.format(destination_name.lower().replace(' ', '-'),
date_trip_begin.year)
#logging.info("%s -- %s %s", tag, date_trip_begin, date_trip_end)
# Remove regular entries during this trip.
credit_regular_entries = [entry
for entry in credit_regular_entries
if not(date_trip_begin <= entry.date < date_trip_end)]
# Generate entries for the trip.
this_trip_entries = generate_trip_entries(
date_trip_begin, date_trip_end,
tag, config,
destination_name.replace('-', ' ').title(), HOME_NAME,
account_credit)
trip_entries.extend(this_trip_entries)
logging.info("Generating Credit Card Payment Entries")
credit_payments = generate_clearing_entries(
delay_dates(rrule.rrule(rrule.MONTHLY,
dtstart=date_begin, until=date_end, bymonthday=7), 0, 4),
"CreditCard1", "Paying off credit card",
credit_regular_entries,
account_credit, account_checking)
credit_entries = credit_regular_entries + trip_entries + credit_payments
logging.info("Generating Tax Filings and Payments")
tax_preamble = generate_tax_preamble(date_birth)
# Figure out all the years we need tax accounts for.
years = set()
for account_name in getters.get_accounts(income_entries):
match = re.match(r'Expenses:Taxes:Y(\d\d\d\d)', account_name)
if match:
years.add(int(match.group(1)))
taxes = [(year, generate_tax_accounts(year, date_end)) for year in sorted(years)]
tax_entries = tax_preamble + functools.reduce(operator.add,
(entries
for _, entries in taxes))
logging.info("Generating Opening of Banking Accounts")
# Open banking accounts and gift the checking account with a balance that
# will offset all the amounts to ensure a positive balance throughout its
# lifetime.
entries_for_banking = data.sorted(income_entries +
banking_expenses +
credit_entries +
tax_entries)
minimum = get_minimum_balance(entries_for_banking,
account_checking, 'CCY')
banking_entries = generate_banking(entries_for_banking,
date_begin, date_end,
max(-minimum, ZERO))
logging.info("Generating Transfers to Investment Account")
banking_transfers = generate_outgoing_transfers(
data.sorted(income_entries +
banking_entries +
banking_expenses +
credit_entries +
tax_entries),
account_checking,
account_investing,
transfer_minimum=D('200'),
transfer_threshold=D('3000'),
transfer_increment=D('500'))
logging.info("Generating Prices")
# Generate price entries for investment currencies and create a price map to
# use for later for generating investment transactions.
funds_allocation = {'MFUND1': 0.40, 'MFUND2': 0.60}
stocks = ['STK1', 'STK2', 'STK3', 'STK4']
price_entries = generate_prices(date_begin, date_end,
sorted(funds_allocation.keys()) + stocks, 'CCY')
price_map = prices.build_price_map(price_entries)
logging.info("Generating Employer Match Contribution")
account_match = 'Income:US:Employer1:Match401k'
retirement_match = generate_retirement_employer_match(income_entries,
join(account_retirement, 'Cash'),
account_match)
logging.info("Generating Retirement Investments")
retirement_entries = generate_retirement_investments(
income_entries + retirement_match, account_retirement,
sorted(funds_allocation.items()),
price_map)
logging.info("Generating Taxes Investments")
investment_entries = generate_taxable_investment(date_begin, date_end,
banking_transfers, price_map,
stocks)
logging.info("Generating Expense Accounts")
expense_accounts_entries = generate_expense_accounts(date_birth)
logging.info("Generating Equity Accounts")
equity_entries = generate_open_entries(date_birth, [account_opening,
account_payable])
logging.info("Generating Balance Checks")
credit_checks = generate_balance_checks(credit_entries, account_credit,
date_random_seq(date_begin, date_end, 20, 30))
banking_checks = generate_balance_checks(data.sorted(income_entries +
banking_entries +
banking_expenses +
banking_transfers +
credit_entries +
tax_entries),
account_checking,
date_random_seq(date_begin, date_end, 20, 30))
logging.info("Outputting and Formatting Entries")
dcontext = display_context.DisplayContext()
default_int_digits = 8
for currency, precision in {'USD': 2,
'CAD': 2,
'VACHR':0,
'IRAUSD': 2,
'VBMPX': 3,
'RGAGX': 3,
'ITOT': 0,
'VEA': 0,
'VHT': 0,
'GLD': 0}.items():
int_digits = default_int_digits
if precision > 0:
int_digits += 1 + precision
dcontext.update(D('{{:0{}.{}f}}'.format(int_digits, precision).format(0)), currency)
output = io.StringIO()
def output_section(title, entries):
output.write('\n\n\n{}\n\n'.format(title))
printer.print_entries(data.sorted(entries), dcontext, file=output)
output.write(FILE_PREAMBLE.format(**locals()))
output_section('* Commodities', commodity_entries)
output_section('* Equity Accounts', equity_entries)
output_section('* Banking', data.sorted(banking_entries +
banking_expenses +
banking_transfers +
banking_checks))
output_section('* Credit-Cards', data.sorted(credit_entries +
credit_checks))
output_section('* Taxable Investments', investment_entries)
output_section('* Retirement Investments', data.sorted(retirement_entries +
retirement_match))
output_section('* Sources of Income', income_entries)
output_section('* Taxes', tax_preamble)
for year, entries in taxes:
output_section('** Tax Year {}'.format(year), entries)
output_section('* Expenses', expense_accounts_entries)
output_section('* Prices', price_entries)
output_section('* Cash', [])
logging.info("Contextualizing to Realistic Names")
contents, replacements = contextualize_file(output.getvalue(), employer_name)
if reformat:
contents = format.align_beancount(contents)
logging.info("Writing contents")
file.write(contents)
logging.info("Validating Results")
validate_output(contents,
[replace(account, replacements)
for account in [account_checking]],
replace('CCY', replacements))
beancount.scripts.format
beancount.scripts.format.align_beancount(contents, prefix_width=None, num_width=None, currency_column=None)
Reformat Beancount input to align all the numbers at the same column.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def align_beancount(contents, prefix_width=None, num_width=None, currency_column=None):
"""Reformat Beancount input to align all the numbers at the same column.
Args:
contents: A string, Beancount input syntax to reformat.
prefix_width: An integer, the width in characters to render the account
name to. If this is not specified, a good value is selected
automatically from the contents of the file.
num_width: An integer, the width to render each number. If this is not
specified, a good value is selected automatically from the contents of
the file.
currency_column: An integer, the column at which to align the currencies.
If given, this overrides the other options.
Returns:
A string, reformatted Beancount input with all the number aligned.
No other changes than whitespace changes should be present between that
return value and the input contents.
"""
# Find all lines that have a number in them and calculate the maximum length
# of the stripped prefix and the number.
match_pairs = []
for line in contents.splitlines():
match = regex.match(
rf'(^\d[^";]*?|\s+{account.ACCOUNT_RE})\s+'
rf'({PARENTHESIZED_BINARY_OP_RE}|{NUMBER_RE})\s+'
rf'((?:{amount.CURRENCY_RE})\b.*)',
line,
)
if match:
prefix, number, rest = match.groups()
match_pairs.append((prefix, number, rest))
else:
match_pairs.append((line, None, None))
# Normalize whitespace before lines that has some indent and an account
# name.
norm_match_pairs = normalize_indent_whitespace(match_pairs)
if currency_column:
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
num_of_spaces = currency_column - len(prefix) - len(number) - 4
spaces = ' ' * num_of_spaces
output.write(prefix + spaces + ' ' + number + ' ' + rest)
output.write('\n')
return output.getvalue()
# Compute the maximum widths.
filtered_pairs = [(prefix, number)
for prefix, number, _ in match_pairs
if number is not None]
if filtered_pairs:
max_prefix_width = max(len(prefix) for prefix, _ in filtered_pairs)
max_num_width = max(len(number) for _, number in filtered_pairs)
else:
max_prefix_width = 0
max_num_width = 0
# Use user-supplied overrides, if available
if prefix_width:
max_prefix_width = prefix_width
if num_width:
max_num_width = num_width
# Create a format that will admit the maximum width of all prefixes equally.
line_format = '{{:<{prefix_width}}} {{:>{num_width}}} {{}}'.format(
prefix_width=max_prefix_width,
num_width=max_num_width)
# Process each line to an output buffer.
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
output.write(line_format.format(prefix.rstrip(), number, rest))
output.write('\n')
formatted_contents = output.getvalue()
# Ensure that the file before and after have only whitespace differences.
# This is a sanity check, to make really sure we never change anything but whitespace,
# so it's safe.
# open('/tmp/before', 'w').write(regex.sub(r'[ \t]+', ' ', contents))
# open('/tmp/after', 'w').write(regex.sub(r'[ \t]+', ' ', formatted_contents))
old_stripped = regex.sub(r'[ \t\n]+', ' ', contents.rstrip())
new_stripped = regex.sub(r'[ \t\n]+', ' ', formatted_contents.rstrip())
assert (old_stripped == new_stripped), (old_stripped, new_stripped)
return formatted_contents
beancount.scripts.format.compute_most_frequent(iterable)
Compute the frequencies of the given elements and return the most frequent.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def compute_most_frequent(iterable):
"""Compute the frequencies of the given elements and return the most frequent.
Args:
iterable: A collection of hashable elements.
Returns:
The most frequent element. If there are no elements in the iterable,
return None.
"""
frequencies = collections.Counter(iterable)
if not frequencies:
return None
counts = sorted((count, element)
for element, count in frequencies.items())
# Note: In case of a tie, this chooses the longest width.
# We could eventually make this an option.
return counts[-1][1]
beancount.scripts.format.normalize_indent_whitespace(match_pairs)
Normalize whitespace before lines that has some indent and an account name.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def normalize_indent_whitespace(match_pairs):
"""Normalize whitespace before lines that has some indent and an account name.
Args:
match_pairs: A list of (prefix, number, rest) tuples.
Returns:
Another list of (prefix, number, rest) tuples, where prefix may have been
adjusted with a different whitespace prefix.
"""
# Compute most frequent account name prefix.
match_posting = regex.compile(r'([ \t]+)({}.*)'.format(account.ACCOUNT_RE)).match
width = compute_most_frequent(
len(match.group(1))
for match in (match_posting(prefix)
for prefix, _, _ in match_pairs)
if match is not None)
norm_format = ' ' * (width or 0) + '{}'
# Make the necessary adjustments.
adjusted_pairs = []
for tup in match_pairs:
prefix, number, rest = tup
match = match_posting(prefix)
if match is not None:
tup = (norm_format.format(match.group(2)), number, rest)
adjusted_pairs.append(tup)
return adjusted_pairs