beancount.scripts
Implementation of the various scripts available from bin.
This is structured this way because we want all the significant codes under a single directory, for analysis, grepping and unit testing.
beancount.scripts.bake
Bake a Beancount input file's web files to a directory hierarchy.
You provide a Beancount filename, an output directory, and this script runs a server and a scraper that puts all the files in the directory, and if your output name has an archive suffix, we automatically the fetched directory contents to the archive and delete them.
beancount.scripts.bake.archive(command_template, directory, archive, quiet=False)
Archive the directory to the given tar/gz archive filename.
Parameters: |
|
---|
Exceptions: |
|
---|
Source code in beancount/scripts/bake.py
def archive(command_template, directory, archive, quiet=False):
"""Archive the directory to the given tar/gz archive filename.
Args:
command_template: A string, the command template to format with in order
to compute the command to run.
directory: A string, the name of the directory to archive.
archive: A string, the name of the file to output.
quiet: A boolean, True to suppress output.
Raises:
IOError: if the directory does not exist or if the archive name already
exists.
"""
directory = path.abspath(directory)
archive = path.abspath(archive)
if not path.exists(directory):
raise IOError("Directory to archive '{}' does not exist".format(
directory))
if path.exists(archive):
raise IOError("Output archive name '{}' already exists".format(
archive))
command = command_template.format(directory=directory,
dirname=path.dirname(directory),
basename=path.basename(directory),
archive=archive)
pipe = subprocess.Popen(shlex.split(command),
shell=False,
cwd=path.dirname(directory),
stdout=subprocess.PIPE if quiet else None,
stderr=subprocess.PIPE if quiet else None)
_, _ = pipe.communicate()
if pipe.returncode != 0:
raise OSError("Archive failure")
beancount.scripts.bake.archive_zip(directory, archive)
Archive the directory to the given tar/gz archive filename.
Parameters: |
|
---|
Source code in beancount/scripts/bake.py
def archive_zip(directory, archive):
"""Archive the directory to the given tar/gz archive filename.
Args:
directory: A string, the name of the directory to archive.
archive: A string, the name of the file to output.
"""
# Figure out optimal level of compression among the supported ones in this
# installation.
for spec, compression in [
('lzma', zipfile.ZIP_LZMA),
('bz2', zipfile.ZIP_BZIP2),
('zlib', zipfile.ZIP_DEFLATED)]:
if importlib.util.find_spec(spec):
zip_compression = compression
break
else:
# Default is no compression.
zip_compression = zipfile.ZIP_STORED
with file_utils.chdir(directory), zipfile.ZipFile(
archive, 'w', compression=zip_compression) as archfile:
for root, dirs, files in os.walk(directory):
for filename in files:
relpath = path.relpath(path.join(root, filename), directory)
archfile.write(relpath)
beancount.scripts.bake.bake_to_directory(webargs, output_dir, render_all_pages=True)
Serve and bake a Beancount's web to a directory.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/bake.py
def bake_to_directory(webargs, output_dir, render_all_pages=True):
"""Serve and bake a Beancount's web to a directory.
Args:
webargs: An argparse parsed options object with the web app arguments.
output_dir: A directory name. We don't check here whether it exists or not.
quiet: A boolean, True to suppress web server fetch log.
render_all_pages: If true, fetch the full set of pages, not just the subset that
is palatable.
Returns:
True on success, False otherwise.
"""
callback = functools.partial(save_scraped_document, output_dir)
if render_all_pages:
ignore_regexps = None
else:
regexps = [
# Skip the context pages, too slow.
r'/context/',
# Skip the link pages, too slow.
r'/link/',
# Skip the component pages... too many.
r'/view/component/',
# Skip served documents.
r'/.*/doc/',
# Skip monthly pages.
r'/view/year/\d\d\d\d/month/',
]
ignore_regexps = '({})'.format('|'.join(regexps))
processed_urls, skipped_urls = web.scrape_webapp(webargs, callback, ignore_regexps)
beancount.scripts.bake.normalize_filename(url)
Convert URL paths to filenames. Add .html extension if needed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/bake.py
def normalize_filename(url):
"""Convert URL paths to filenames. Add .html extension if needed.
Args:
url: A string, the url to convert.
Returns:
A string, possibly with an extension appended.
"""
if url.endswith('/'):
return path.join(url, 'index.html')
elif BINARY_MATCH(url):
return url
else:
return url if url.endswith('.html') else (url + '.html')
beancount.scripts.bake.relativize_links(html, current_url)
Make all the links in the contents string relative to an URL.
Parameters: |
|
---|
Source code in beancount/scripts/bake.py
def relativize_links(html, current_url):
"""Make all the links in the contents string relative to an URL.
Args:
html: An lxml document node.
current_url: A string, the URL of the current page, a path to.
a file or a directory. If the path represents a directory, the
path ends with a /.
"""
current_dir = path.dirname(current_url)
for element, attribute, link, pos in lxml.html.iterlinks(html):
if path.isabs(link):
relative_link = path.relpath(normalize_filename(link), current_dir)
element.set(attribute, relative_link)
beancount.scripts.bake.remove_links(html, targets)
Convert a list of anchors (<a>) from an HTML tree to spans (<span>).
Parameters: |
|
---|
Source code in beancount/scripts/bake.py
def remove_links(html, targets):
"""Convert a list of anchors (<a>) from an HTML tree to spans (<span>).
Args:
html: An lxml document node.
targets: A set of string, targets to be removed.
"""
for element, attribute, link, pos in lxml.html.iterlinks(html):
if link in targets:
del element.attrib[attribute]
element.tag = 'span'
element.set('class', 'removed-link')
beancount.scripts.bake.save_scraped_document(output_dir, url, response, contents, html_root, skipped_urls)
Callback function to process a document being scraped.
This converts the document to have relative links and writes out the file to the output directory.
Parameters: |
|
---|
Source code in beancount/scripts/bake.py
def save_scraped_document(output_dir, url, response, contents, html_root, skipped_urls):
"""Callback function to process a document being scraped.
This converts the document to have relative links and writes out the file to
the output directory.
Args:
output_dir: A string, the output directory to write.
url: A string, the originally requested URL.
response: An http response as per urlopen.
contents: Bytes, the content of a response.
html_root: An lxml root node for the document, optionally. If this is provided,
this avoid you having to reprocess it (for performance reasons).
skipped_urls: A set of the links from the file that were skipped.
"""
if response.status != 200:
logging.error("Invalid status: %s", response.status)
# Ignore directories.
if url.endswith('/'):
return
# Note that we're saving the file under the non-redirected URL, because this
# will have to be opened using files and there are no redirects that way.
if response.info().get_content_type() == 'text/html':
if html_root is None:
html_root = lxml.html.document_fromstring(contents)
remove_links(html_root, skipped_urls)
relativize_links(html_root, url)
contents = lxml.html.tostring(html_root, method="html")
# Compute output filename and write out the relativized contents.
output_filename = path.join(output_dir,
normalize_filename(url).lstrip('/'))
os.makedirs(path.dirname(output_filename), exist_ok=True)
with open(output_filename, 'wb') as outfile:
outfile.write(contents)
beancount.scripts.check
Parse, check and realize a beancount input file.
This also measures the time it takes to run all these steps.
beancount.scripts.deps
Check the installation dependencies and report the version numbers of each.
This is meant to be used as an error diagnostic tool.
beancount.scripts.deps.check_cdecimal()
Check that Python 3.3 or above is installed.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_cdecimal():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
# Note: this code mirrors and should be kept in-sync with that at the top of
# beancount.core.number.
# Try the built-in installation.
import decimal
if is_fast_decimal(decimal):
return ('cdecimal', '{} (built-in)'.format(decimal.__version__), True)
# Try an explicitly installed version.
try:
import cdecimal
if is_fast_decimal(cdecimal):
return ('cdecimal', getattr(cdecimal, '__version__', 'OKAY'), True)
except ImportError:
pass
# Not found.
return ('cdecimal', None, False)
beancount.scripts.deps.check_dependencies()
Check the runtime dependencies and report their version numbers.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_dependencies():
"""Check the runtime dependencies and report their version numbers.
Returns:
A list of pairs of (package-name, version-number, sufficient) whereby if a
package has not been installed, its 'version-number' will be set to None.
Otherwise, it will be a string with the version number in it. 'sufficient'
will be True if the version if sufficient for this installation of
Beancount.
"""
return [
# Check for a complete installation of Python itself.
check_python(),
check_cdecimal(),
# Modules we really do need installed.
check_import('dateutil'),
check_import('bottle'),
check_import('ply', module_name='ply.yacc', min_version='3.4'),
check_import('lxml', module_name='lxml.etree', min_version='3'),
# Optionally required to upload data to Google Drive.
check_import('googleapiclient'),
check_import('oauth2client'),
check_import('httplib2'),
# Optionally required to support various price source fetchers.
check_import('requests', min_version='2.0'),
# Optionally required to support imports (identify, extract, file) code.
check_python_magic(),
check_import('beautifulsoup4', module_name='bs4', min_version='4'),
]
beancount.scripts.deps.check_import(package_name, min_version=None, module_name=None)
Check that a particular module name is installed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_import(package_name, min_version=None, module_name=None):
"""Check that a particular module name is installed.
Args:
package_name: A string, the name of the package and module to be
imported to verify this works. This should have a __version__
attribute on it.
min_version: If not None, a string, the minimum version number
we require.
module_name: The name of the module to import if it differs from the
package name.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
if module_name is None:
module_name = package_name
try:
__import__(module_name)
module = sys.modules[module_name]
if min_version is not None:
version = module.__version__
assert isinstance(version, str)
is_sufficient = (parse_version(version) >= parse_version(min_version)
if min_version else True)
else:
version, is_sufficient = None, True
except ImportError:
version, is_sufficient = None, False
return (package_name, version, is_sufficient)
beancount.scripts.deps.check_python()
Check that Python 3.3 or above is installed.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_python():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
return ('python3',
'.'.join(map(str, sys.version_info[:3])),
sys.version_info[:2] >= (3, 3))
beancount.scripts.deps.check_python_magic()
Check that a recent-enough version of python-magic is installed.
python-magic is an interface to libmagic, which is used by the 'file' tool and UNIX to identify file types. Note that there are two Python wrappers which provide the 'magic' import: python-magic and filemagic. The former is what we need, which appears to be more recently maintained.
Returns: |
|
---|
Source code in beancount/scripts/deps.py
def check_python_magic():
"""Check that a recent-enough version of python-magic is installed.
python-magic is an interface to libmagic, which is used by the 'file' tool
and UNIX to identify file types. Note that there are two Python wrappers
which provide the 'magic' import: python-magic and filemagic. The former is
what we need, which appears to be more recently maintained.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
try:
import magic
# Check that python-magic and not filemagic is installed.
if not hasattr(magic, 'from_file'):
# 'filemagic' is installed; install python-magic.
raise ImportError
return ('python-magic', 'OK', True)
except ImportError:
return ('python-magic', None, False)
beancount.scripts.deps.is_fast_decimal(decimal_module)
Return true if a fast C decimal implementation is installed.
Source code in beancount/scripts/deps.py
def is_fast_decimal(decimal_module):
"Return true if a fast C decimal implementation is installed."
return isinstance(decimal_module.Decimal().sqrt, types.BuiltinFunctionType)
beancount.scripts.deps.list_dependencies(file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)
Check the dependencies and produce a listing on the given file.
Parameters: |
|
---|
Source code in beancount/scripts/deps.py
def list_dependencies(file=sys.stderr):
"""Check the dependencies and produce a listing on the given file.
Args:
file: A file object to write the output to.
"""
print("Dependencies:")
for package, version, sufficient in check_dependencies():
print(" {:16}: {} {}".format(
package,
version or 'NOT INSTALLED',
"(INSUFFICIENT)" if version and not sufficient else ""),
file=file)
beancount.scripts.deps.parse_version(version_str)
Parse the version string into a comparable tuple.
Source code in beancount/scripts/deps.py
def parse_version(version_str: str) -> str:
"""Parse the version string into a comparable tuple."""
return [int(v) for v in version_str.split('.')]
beancount.scripts.directories
Check that document directories mirror a list of accounts correctly.
beancount.scripts.directories.ValidateDirectoryError (Exception)
A directory validation error.
beancount.scripts.directories.validate_directories(entries, document_dirs)
Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized subdirectory names under the given roots match the account names.
Parameters: |
|
---|
Source code in beancount/scripts/directories.py
def validate_directories(entries, document_dirs):
"""Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized
subdirectory names under the given roots match the account names.
Args:
entries: A list of directives.
document_dirs: A list of string, the directory roots to walk and validate.
"""
# Get the list of accounts declared in the ledge.
accounts = getters.get_accounts(entries)
# For each of the roots, validate the hierarchy of directories.
for document_dir in document_dirs:
errors = validate_directory(accounts, document_dir)
for error in errors:
print("ERROR: {}".format(error))
beancount.scripts.directories.validate_directory(accounts, document_dir)
Check a directory hierarchy against a list of valid accounts.
Walk the directory hierarchy, and for all directories with names matching that of accounts (with ":" replaced with "/"), check that they refer to an account name declared in the given list.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/directories.py
def validate_directory(accounts, document_dir):
"""Check a directory hierarchy against a list of valid accounts.
Walk the directory hierarchy, and for all directories with names matching
that of accounts (with ":" replaced with "/"), check that they refer to an
account name declared in the given list.
Args:
account: A set or dict of account names.
document_dir: A string, the root directory to walk and validate.
Returns:
An errors for each invalid directory name found.
"""
# Generate all parent accounts in the account_set we're checking against, so
# that parent directories with no corresponding account don't warn.
accounts_with_parents = set(accounts)
for account_ in accounts:
while True:
parent = account.parent(account_)
if not parent:
break
if parent in accounts_with_parents:
break
accounts_with_parents.add(parent)
account_ = parent
errors = []
for directory, account_name, _, _ in account.walk(document_dir):
if account_name not in accounts_with_parents:
errors.append(ValidateDirectoryError(
"Invalid directory '{}': no corresponding account '{}'".format(
directory, account_name)))
return errors
beancount.scripts.doctor
Debugging tool for those finding bugs in Beancount.
This tool is able to dump lexer/parser state, and will provide other services in the name of debugging.
beancount.scripts.doctor.RenderError (tuple)
RenderError(source, message, entry)
beancount.scripts.doctor.RenderError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/doctor.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.doctor.RenderError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of RenderError(source, message, entry)
beancount.scripts.doctor.RenderError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/doctor.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.doctor.do_checkdeps(*unused_args)
Report on the runtime dependencies.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_deps(*unused_args):
"""Report on the runtime dependencies.
Args:
unused_args: Ignored.
"""
from beancount.scripts import deps
deps.list_dependencies(sys.stdout)
print('')
print('Use "pip3 install <package>" to install new packages.')
beancount.scripts.doctor.do_context(filename, args)
Describe the context that a particular transaction is applied to.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_context(filename, args):
"""Describe the context that a particular transaction is applied to.
Args:
filename: A string, which consists in the filename.
args: A tuple of the rest of arguments. We're expecting the first argument
to be a string which contains either a lineno integer or a filename:lineno
combination (which can be used if the location is not in the top-level file).
"""
from beancount.reports import context
from beancount import loader
# Check we have the required number of arguments.
if len(args) != 1:
raise SystemExit("Missing line number argument.")
# Load the input files.
entries, errors, options_map = loader.load_file(filename)
# Parse the arguments, get the line number.
match = re.match(r"(.+):(\d+)$", args[0])
if match:
search_filename = path.abspath(match.group(1))
lineno = int(match.group(2))
elif re.match(r"(\d+)$", args[0]):
# Note: Make sure to use the absolute filename used by the parser to
# resolve the file.
search_filename = options_map['filename']
lineno = int(args[0])
else:
raise SystemExit("Invalid format for location.")
str_context = context.render_file_context(entries, options_map,
search_filename, lineno)
sys.stdout.write(str_context)
beancount.scripts.doctor.do_deps(*unused_args)
Report on the runtime dependencies.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_deps(*unused_args):
"""Report on the runtime dependencies.
Args:
unused_args: Ignored.
"""
from beancount.scripts import deps
deps.list_dependencies(sys.stdout)
print('')
print('Use "pip3 install <package>" to install new packages.')
beancount.scripts.doctor.do_directories(filename, args)
Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized subdirectory names under the given roots match the account names.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_directories(filename, args):
"""Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized
subdirectory names under the given roots match the account names.
Args:
filename: A string, the Beancount input filename.
args: The rest of the arguments provided on the command-line, which in this
case will be interpreted as the names of root directories to validate against
the accounts in the given ledger.
"""
from beancount import loader
from beancount.scripts import directories
entries, _, __ = loader.load_file(filename)
directories.validate_directories(entries, args)
beancount.scripts.doctor.do_display_context(filename, args)
Print out the precision inferred from the parsed numbers in the input file.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_display_context(filename, args):
"""Print out the precision inferred from the parsed numbers in the input file.
Args:
filename: A string, which consists in the filename.
args: A tuple of the rest of arguments. We're expecting the first argument
to be an integer as a string.
"""
from beancount import loader
entries, errors, options_map = loader.load_file(filename)
dcontext = options_map['dcontext']
sys.stdout.write(str(dcontext))
beancount.scripts.doctor.do_dump_lexer(filename, unused_args)
Dump the lexer output for a Beancount syntax file.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_lex(filename, unused_args):
"""Dump the lexer output for a Beancount syntax file.
Args:
filename: A string, the Beancount input filename.
"""
from beancount.parser import lexer
for token, lineno, text, obj in lexer.lex_iter(filename):
sys.stdout.write('{:12} {:6d} {}\n'.format(
'(None)' if token is None else token, lineno, repr(text)))
beancount.scripts.doctor.do_lex(filename, unused_args)
Dump the lexer output for a Beancount syntax file.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_lex(filename, unused_args):
"""Dump the lexer output for a Beancount syntax file.
Args:
filename: A string, the Beancount input filename.
"""
from beancount.parser import lexer
for token, lineno, text, obj in lexer.lex_iter(filename):
sys.stdout.write('{:12} {:6d} {}\n'.format(
'(None)' if token is None else token, lineno, repr(text)))
beancount.scripts.doctor.do_linked(filename, args)
Print out a list of transactions linked to the one at the given line.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_linked(filename, args):
"""Print out a list of transactions linked to the one at the given line.
Args:
filename: A string, which consists in the filename.
args: A tuple of the rest of arguments. We're expecting the first argument
to be an integer as a string.
"""
from beancount.parser import options
from beancount.parser import printer
from beancount.core import account_types
from beancount.core import inventory
from beancount.core import data
from beancount.core import realization
from beancount import loader
# Parse the arguments, get the line number.
if len(args) != 1:
raise SystemExit("Missing line number argument.")
lineno = int(args[0])
# Load the input file.
entries, errors, options_map = loader.load_file(filename)
# Find the closest entry.
closest_entry = data.find_closest(entries, options_map['filename'], lineno)
# Find its links.
if closest_entry is None:
raise SystemExit("No entry could be found before {}:{}".format(filename, lineno))
links = (closest_entry.links
if isinstance(closest_entry, data.Transaction)
else data.EMPTY_SET)
if not links:
linked_entries = [closest_entry]
else:
# Find all linked entries.
#
# Note that there is an option here: You can either just look at the links
# on the closest entry, or you can include the links of the linked
# transactions as well. Whichever one you want depends on how you use your
# links. Best would be to query the user (in Emacs) when there are many
# links present.
follow_links = True
if not follow_links:
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
else:
links = set(links)
linked_entries = []
while True:
num_linked = len(linked_entries)
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
if len(linked_entries) == num_linked:
break
for entry in linked_entries:
if entry.links:
links.update(entry.links)
# Render linked entries (in date order) as errors (for Emacs).
errors = [RenderError(entry.meta, '', entry)
for entry in linked_entries]
printer.print_errors(errors)
# Print out balances.
real_root = realization.realize(linked_entries)
dformat = options_map['dcontext'].build(alignment=display_context.Align.DOT,
reserved=2)
realization.dump_balances(real_root, dformat, file=sys.stdout)
# Print out net income change.
acctypes = options.get_account_types(options_map)
net_income = inventory.Inventory()
for real_node in realization.iter_children(real_root):
if account_types.is_income_statement_account(real_node.account, acctypes):
net_income.add_inventory(real_node.balance)
print()
print('Net Income: {}'.format(-net_income))
beancount.scripts.doctor.do_list_options(*unused_args)
Print out a list of the available options.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_list_options(*unused_args):
"""Print out a list of the available options.
Args:
unused_args: Ignored.
"""
from beancount.parser import options
print(options.list_options())
beancount.scripts.doctor.do_missing_open(filename, args)
Print out Open directives that are missing for the given input file.
This can be useful during demos in order to quickly generate all the required Open directives without having to type them manually.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_missing_open(filename, args):
"""Print out Open directives that are missing for the given input file.
This can be useful during demos in order to quickly generate all the
required Open directives without having to type them manually.
Args:
filename: A string, which consists in the filename.
args: A tuple of the rest of arguments. We're expecting the first argument
to be an integer as a string.
"""
from beancount.parser import printer
from beancount.core import data
from beancount.core import getters
from beancount import loader
entries, errors, options_map = loader.load_file(filename)
# Get accounts usage and open directives.
first_use_map, _ = getters.get_accounts_use_map(entries)
open_close_map = getters.get_account_open_close(entries)
new_entries = []
for account, first_use_date in first_use_map.items():
if account not in open_close_map:
new_entries.append(
data.Open(data.new_metadata(filename, 0), first_use_date, account,
None, None))
dcontext = options_map['dcontext']
printer.print_entries(data.sorted(new_entries), dcontext)
beancount.scripts.doctor.do_parse(filename, unused_args)
Run the parser in debug mode.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_parse(filename, unused_args):
"""Run the parser in debug mode.
Args:
filename: A string, the Beancount input filename.
"""
from beancount.parser import parser
entries, errors, _ = parser.parse_file(filename, yydebug=1)
beancount.scripts.doctor.do_print_options(filename, *args)
Print out the actual options parsed from a file.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_print_options(filename, *args):
"""Print out the actual options parsed from a file.
Args:
unused_args: Ignored.
"""
from beancount import loader
_, __, options_map = loader.load_file(filename)
for key, value in sorted(options_map.items()):
print('{}: {}'.format(key, value))
beancount.scripts.doctor.do_roundtrip(filename, unused_args)
Round-trip test on arbitrary Ledger.
Read a Ledger's transactions, print them out, re-read them again and compare them. Both sets of parsed entries should be equal. Both printed files are output to disk, so you can also run diff on them yourself afterwards.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_roundtrip(filename, unused_args):
"""Round-trip test on arbitrary Ledger.
Read a Ledger's transactions, print them out, re-read them again and compare
them. Both sets of parsed entries should be equal. Both printed files are
output to disk, so you can also run diff on them yourself afterwards.
Args:
filename: A string, the Beancount input filename.
"""
from beancount.parser import printer
from beancount.core import compare
from beancount import loader
round1_filename = round2_filename = None
try:
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s: %(message)s')
logging.info("Read the entries")
entries, errors, options_map = loader.load_file(filename)
printer.print_errors(errors, file=sys.stderr)
logging.info("Print them out to a file")
basename, extension = path.splitext(filename)
round1_filename = ''.join([basename, '.roundtrip1', extension])
with open(round1_filename, 'w') as outfile:
printer.print_entries(entries, file=outfile)
logging.info("Read the entries from that file")
# Note that we don't want to run any of the auto-generation here, but
# parsing now returns incomplete objects and we assume idempotence on a
# file that was output from the printer after having been processed, so
# it shouldn't add anything new. That is, a processed file printed and
# resolve when parsed again should contain the same entries, i.e.
# nothing new should be generated.
entries_roundtrip, errors, options_map = loader.load_file(round1_filename)
# Print out the list of errors from parsing the results.
if errors:
print(',----------------------------------------------------------------------')
printer.print_errors(errors, file=sys.stdout)
print('`----------------------------------------------------------------------')
logging.info("Print what you read to yet another file")
round2_filename = ''.join([basename, '.roundtrip2', extension])
with open(round2_filename, 'w') as outfile:
printer.print_entries(entries_roundtrip, file=outfile)
logging.info("Compare the original entries with the re-read ones")
same, missing1, missing2 = compare.compare_entries(entries, entries_roundtrip)
if same:
logging.info('Entries are the same. Congratulations.')
else:
logging.error('Entries differ!')
print()
print('\n\nMissing from original:')
for entry in entries:
print(entry)
print(compare.hash_entry(entry))
print(printer.format_entry(entry))
print()
print('\n\nMissing from round-trip:')
for entry in missing2:
print(entry)
print(compare.hash_entry(entry))
print(printer.format_entry(entry))
print()
finally:
for rfilename in (round1_filename, round2_filename):
if path.exists(rfilename):
os.remove(rfilename)
beancount.scripts.doctor.do_validate_html(directory, args)
Validate all the HTML files under a directory hierarchy.
Parameters: |
|
---|
Source code in beancount/scripts/doctor.py
def do_validate_html(directory, args):
"""Validate all the HTML files under a directory hierarchy.
Args:
directory: A string, the root directory whose contents to validate.
args: A tuple of the rest of arguments.
"""
from beancount.web import scrape
files, missing, empty = scrape.validate_local_links_in_dir(directory)
logging.info('%d files processed', len(files))
for target in missing:
logging.error('Missing %s', target)
for target in empty:
logging.error('Empty %s', target)
beancount.scripts.doctor.get_commands()
Return a list of available commands in this file.
Returns: |
|
---|
Source code in beancount/scripts/doctor.py
def get_commands():
"""Return a list of available commands in this file.
Returns:
A list of pairs of (command-name string, docstring).
"""
commands = []
for attr_name, attr_value in globals().items():
match = re.match('do_(.*)', attr_name)
if match:
commands.append((match.group(1),
misc_utils.first_paragraph(attr_value.__doc__)))
return commands
beancount.scripts.example
Generate a decently-sized example history, based on some rules.
This script is used to generate some meaningful input to Beancount, input that looks as realistic as possible for a moderately complex mock individual. This can also be used as an input generator for a stress test for performance evaluation.
beancount.scripts.example.check_non_negative(entries, account, currency)
Check that the balance of the given account never goes negative.
Parameters: |
|
---|
Exceptions: |
|
---|
Source code in beancount/scripts/example.py
def check_non_negative(entries, account, currency):
"""Check that the balance of the given account never goes negative.
Args:
entries: A list of directives.
account: An account string, the account to check the balance for.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: if the balance goes negative.
"""
previous_date = None
for txn_posting, balances in postings_for(data.sorted(entries), [account], before=True):
balance = balances[account]
date = txn_posting.txn.date
if date != previous_date:
assert all(pos.units.number >= ZERO for pos in balance.get_positions()), (
"Negative balance: {} at: {}".format(balance, txn_posting.txn.date))
previous_date = date
beancount.scripts.example.compute_trip_dates(date_begin, date_end)
Generate dates at reasonable intervals for trips during the given time period.
Parameters: |
|
---|
Yields: Pairs of dates for the trips within the period.
Source code in beancount/scripts/example.py
def compute_trip_dates(date_begin, date_end):
"""Generate dates at reasonable intervals for trips during the given time period.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of dates for the trips within the period.
"""
# Min and max number of days remaining at home.
days_at_home = (4*30, 13*30)
# Length of trip.
days_trip = (8, 22)
# Number of days to ensure no trip at the beginning and the end.
days_buffer = 21
date_begin += datetime.timedelta(days=days_buffer)
date_end -= datetime.timedelta(days=days_buffer)
date = date_begin
while 1:
duration_at_home = datetime.timedelta(days=random.randint(*days_at_home))
duration_trip = datetime.timedelta(days=random.randint(*days_trip))
date_trip_begin = date + duration_at_home
date_trip_end = date_trip_begin + duration_trip
if date_trip_end >= date_end:
break
yield (date_trip_begin, date_trip_end)
date = date_trip_end
beancount.scripts.example.contextualize_file(contents, employer)
Replace generic strings in the generated file with realistic strings.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def contextualize_file(contents, employer):
"""Replace generic strings in the generated file with realistic strings.
Args:
contents: A string, the generic file contents.
Returns:
A string, the contextualized version.
"""
replacements = {
'CC': 'US',
'Bank1': 'BofA',
'Bank1_Institution': 'Bank of America',
'Bank1_Address': '123 America Street, LargeTown, USA',
'Bank1_Phone': '+1.012.345.6789',
'CreditCard1': 'Chase:Slate',
'CreditCard2': 'Amex:BlueCash',
'Employer1': employer,
'Retirement': 'Vanguard',
'Retirement_Institution': 'Vanguard Group',
'Retirement_Address': "P.O. Box 1110, Valley Forge, PA 19482-1110",
'Retirement_Phone': "+1.800.523.1188",
'Investment': 'ETrade',
# Commodities
'CCY': 'USD',
'VACHR': 'VACHR',
'DEFCCY': 'IRAUSD',
'MFUND1': 'VBMPX',
'MFUND2': 'RGAGX',
'STK1': 'ITOT',
'STK2': 'VEA',
'STK3': 'VHT',
'STK4': 'GLD',
}
new_contents = replace(contents, replacements)
return new_contents, replacements
beancount.scripts.example.date_iter(date_begin, date_end)
Generate a sequence of dates.
Parameters: |
|
---|
Yields: Instances of datetime.date.
Source code in beancount/scripts/example.py
def date_iter(date_begin, date_end):
"""Generate a sequence of dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date.
"""
assert date_begin <= date_end
date = date_begin
one_day = datetime.timedelta(days=1)
while date < date_end:
date += one_day
yield date
beancount.scripts.example.date_random_seq(date_begin, date_end, days_min, days_max)
Generate a sequence of dates with some random increase in days.
Parameters: |
|
---|
Yields: Instances of datetime.date.
Source code in beancount/scripts/example.py
def date_random_seq(date_begin, date_end, days_min, days_max):
"""Generate a sequence of dates with some random increase in days.
Args:
date_begin: The start date.
date_end: The end date.
days_min: The minimum number of days to advance on each iteration.
days_max: The maximum number of days to advance on each iteration.
Yields:
Instances of datetime.date.
"""
assert days_min > 0
assert days_min <= days_max
date = date_begin
while date < date_end:
nb_days_forward = random.randint(days_min, days_max)
date += datetime.timedelta(days=nb_days_forward)
if date >= date_end:
break
yield date
beancount.scripts.example.delay_dates(date_iter, delay_days_min, delay_days_max)
Delay the dates from the given iterator by some uniformly drawn number of days.
Parameters: |
|
---|
Yields: datetime.date instances.
Source code in beancount/scripts/example.py
def delay_dates(date_iter, delay_days_min, delay_days_max):
"""Delay the dates from the given iterator by some uniformly drawn number of days.
Args:
date_iter: An iterator of datetime.date instances.
delay_days_min: The minimum amount of advance days for the transaction.
delay_days_max: The maximum amount of advance days for the transaction.
Yields:
datetime.date instances.
"""
dates = list(date_iter)
last_date = dates[-1]
last_date = last_date.date() if isinstance(last_date, datetime.datetime) else last_date
for dtime in dates:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
date += datetime.timedelta(days=random.randint(delay_days_min, delay_days_max))
if date >= last_date:
break
yield date
beancount.scripts.example.generate_balance_checks(entries, account, date_iter)
Generate balance check entries to the given frequency.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_balance_checks(entries, account, date_iter):
"""Generate balance check entries to the given frequency.
Args:
entries: A list of directives that contain all the transactions for the
accounts.
account: The name of the account for which to generate.
date_iter: Iterator of dates. We generate balance checks at these dates.
Returns:
A list of balance check entries.
"""
balance_checks = []
date_iter = iter(date_iter)
next_date = next(date_iter)
with misc_utils.swallow(StopIteration):
for txn_posting, balance in postings_for(entries, [account], before=True):
while txn_posting.txn.date >= next_date:
amount = balance[account].get_currency_units('CCY').number
balance_checks.extend(parse("""
{next_date} balance {account} {amount} CCY
""", **locals()))
next_date = next(date_iter)
return balance_checks
beancount.scripts.example.generate_banking(entries, date_begin, date_end, amount_initial)
Generate a checking account opening.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_banking(entries, date_begin, date_end, amount_initial):
"""Generate a checking account opening.
Args:
entries: A list of entries which affect this account.
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
amount_initial: A Decimal instance, the amount to initialize the checking
account with.
Returns:
A list of directives.
"""
amount_initial_neg = -amount_initial
new_entries = parse("""
{date_begin} open Assets:CC:Bank1
institution: "Bank1_Institution"
address: "Bank1_Address"
phone: "Bank1_Phone"
{date_begin} open Assets:CC:Bank1:Checking CCY
account: "00234-48574897"
;; {date_begin} open Assets:CC:Bank1:Savings CCY
{date_begin} * "Opening Balance for checking account"
Assets:CC:Bank1:Checking {amount_initial} CCY
Equity:Opening-Balances {amount_initial_neg} CCY
""", **locals())
date_balance = date_begin + datetime.timedelta(days=1)
account = 'Assets:CC:Bank1:Checking'
for txn_posting, balances in postings_for(data.sorted(entries + new_entries),
[account], before=True):
if txn_posting.txn.date >= date_balance:
break
amount_balance = balances[account].get_currency_units('CCY').number
bal_entries = parse("""
{date_balance} balance Assets:CC:Bank1:Checking {amount_balance} CCY
""", **locals())
return new_entries + bal_entries
beancount.scripts.example.generate_banking_expenses(date_begin, date_end, account, rent_amount)
Generate expenses paid out of a checking account, typically living expenses.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_banking_expenses(date_begin, date_end, account, rent_amount):
"""Generate expenses paid out of a checking account, typically living expenses.
Args:
date_begin: The start date.
date_end: The end date.
account: The checking account to generate expenses to.
rent_amount: The amount of rent.
Returns:
A list of directives.
"""
fee_expenses = generate_periodic_expenses(
rrule.rrule(rrule.MONTHLY, bymonthday=4, dtstart=date_begin, until=date_end),
"BANK FEES", "Monthly bank fee",
account, 'Expenses:Financial:Fees',
lambda: D('4.00'))
rent_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 2, 5),
"RiverBank Properties", "Paying the rent",
account, 'Expenses:Home:Rent',
lambda: random.normalvariate(float(rent_amount), 0))
electricity_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 7, 8),
"EDISON POWER", "",
account, 'Expenses:Home:Electricity',
lambda: random.normalvariate(65, 0))
internet_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 20, 22),
"Wine-Tarner Cable", "",
account, 'Expenses:Home:Internet',
lambda: random.normalvariate(80, 0.10))
phone_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 17, 19),
"Verizon Wireless", "",
account, 'Expenses:Home:Phone',
lambda: random.normalvariate(60, 10))
return data.sorted(fee_expenses +
rent_expenses +
electricity_expenses +
internet_expenses +
phone_expenses)
beancount.scripts.example.generate_clearing_entries(date_iter, payee, narration, entries, account_clear, account_from)
Generate entries to clear the value of an account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_clearing_entries(date_iter,
payee, narration,
entries, account_clear, account_from):
"""Generate entries to clear the value of an account.
Args:
date_iter: An iterator of datetime.date instances.
payee: A string, the payee name to use on the transactions.
narration: A string, the narration to use on the transactions.
entries: A list of entries.
account_clear: The account to clear.
account_from: The source account to clear 'account_clear' from.
Returns:
A list of directives.
"""
# The next date we're looking for.
next_date = next(iter(date_iter))
# Iterate over all the postings of the account to clear.
new_entries = []
for txn_posting, balances in postings_for(entries, [account_clear]):
balance_clear = balances[account_clear]
# Check if we need to clear.
if next_date <= txn_posting.txn.date:
pos_amount = balance_clear.get_currency_units('CCY')
neg_amount = -pos_amount
new_entries.extend(parse("""
{next_date} * "{payee}" "{narration}"
{account_clear} {neg_amount.number:.2f} CCY
{account_from} {pos_amount.number:.2f} CCY
""", **locals()))
balance_clear.add_amount(neg_amount)
# Advance to the next date we're looking for.
try:
next_date = next(iter(date_iter))
except StopIteration:
break
return new_entries
beancount.scripts.example.generate_commodity_entries(date_birth)
Create a list of Commodity entries for all the currencies we're using.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_commodity_entries(date_birth):
"""Create a list of Commodity entries for all the currencies we're using.
Args:
date_birth: A datetime.date instance, the date of birth of the user.
Returns:
A list of Commodity entries for all the commodities in use.
"""
return parse("""
1792-01-01 commodity USD
name: "US Dollar"
export: "CASH"
{date_birth} commodity VACHR
name: "Employer Vacation Hours"
export: "IGNORE"
{date_birth} commodity IRAUSD
name: "US 401k and IRA Contributions"
export: "IGNORE"
2009-05-01 commodity RGAGX
name: "American Funds The Growth Fund of America Class R-6"
export: "MUTF:RGAGX"
price: "USD:google/MUTF:RGAGX"
1995-09-18 commodity VBMPX
name: "Vanguard Total Bond Market Index Fund Institutional Plus Shares"
export: "MUTF:VBMPX"
price: "USD:google/MUTF:VBMPX"
2004-01-20 commodity ITOT
name: "iShares Core S&P Total U.S. Stock Market ETF"
export: "NYSEARCA:ITOT"
price: "USD:google/NYSEARCA:ITOT"
2007-07-20 commodity VEA
name: "Vanguard FTSE Developed Markets ETF"
export: "NYSEARCA:VEA"
price: "USD:google/NYSEARCA:VEA"
2004-01-26 commodity VHT
name: "Vanguard Health Care ETF"
export: "NYSEARCA:VHT"
price: "USD:google/NYSEARCA:VHT"
2004-11-01 commodity GLD
name: "SPDR Gold Trust (ETF)"
export: "NYSEARCA:GLD"
price: "USD:google/NYSEARCA:GLD"
1900-01-01 commodity VMMXX
export: "MUTF:VMMXX (MONEY:USD)"
""", **locals())
beancount.scripts.example.generate_employment_income(employer_name, employer_address, annual_salary, account_deposit, account_retirement, date_begin, date_end)
Generate bi-weekly entries for payroll salary income.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_employment_income(employer_name,
employer_address,
annual_salary,
account_deposit,
account_retirement,
date_begin,
date_end):
"""Generate bi-weekly entries for payroll salary income.
Args:
employer_name: A string, the human-readable name of the employer.
employer_address: A string, the address of the employer.
annual_salary: A Decimal, the annual salary of the employee.
account_deposit: An account string, the account to deposit the salary to.
account_retirement: An account string, the account to deposit retirement
contributions to.
date_begin: The start date.
date_end: The end date.
Returns:
A list of directives, including open directives for the account.
"""
preamble = parse("""
{date_begin} event "employer" "{employer_name}, {employer_address}"
{date_begin} open Income:CC:Employer1:Salary CCY
;{date_begin} open Income:CC:Employer1:AnnualBonus CCY
{date_begin} open Income:CC:Employer1:GroupTermLife CCY
{date_begin} open Income:CC:Employer1:Vacation VACHR
{date_begin} open Assets:CC:Employer1:Vacation VACHR
{date_begin} open Expenses:Vacation VACHR
{date_begin} open Expenses:Health:Life:GroupTermLife
{date_begin} open Expenses:Health:Medical:Insurance
{date_begin} open Expenses:Health:Dental:Insurance
{date_begin} open Expenses:Health:Vision:Insurance
;{date_begin} open Expenses:Vacation:Employer
""", **locals())
date_prev = None
contrib_retirement = ZERO
contrib_socsec = ZERO
biweekly_pay = annual_salary / 26
gross = biweekly_pay
medicare = gross * D('0.0231')
federal = gross * D('0.2303')
state = gross * D('0.0791')
city = gross * D('0.0379')
sdi = D('1.12')
lifeinsurance = D('24.32')
dental = D('2.90')
medical = D('27.38')
vision = D('42.30')
fixed = (medicare + federal + state + city + sdi +
dental + medical + vision)
# Calculate vacation hours per-pay.
with decimal.localcontext() as ctx:
ctx.prec = 4
vacation_hrs = (ANNUAL_VACATION_DAYS * D('8')) / D('26')
transactions = []
for dtime in misc_utils.skipiter(
rrule.rrule(rrule.WEEKLY, byweekday=rrule.TH,
dtstart=date_begin, until=date_end), 2):
date = dtime.date()
year = date.year
if not date_prev or date_prev.year != date.year:
contrib_retirement = RETIREMENT_LIMITS.get(date.year, RETIREMENT_LIMITS[None])
contrib_socsec = D('7000')
date_prev = date
retirement_uncapped = math.ceil((gross * D('0.25')) / 100) * 100
retirement = min(contrib_retirement, retirement_uncapped)
contrib_retirement -= retirement
socsec_uncapped = gross * D('0.0610')
socsec = min(contrib_socsec, socsec_uncapped)
contrib_socsec -= socsec
with decimal.localcontext() as ctx:
ctx.prec = 6
deposit = (gross - retirement - fixed - socsec)
retirement_neg = -retirement
gross_neg = -gross
lifeinsurance_neg = -lifeinsurance
vacation_hrs_neg = -vacation_hrs
template = """
{date} * "{employer_name}" "Payroll"
{account_deposit} {deposit:.2f} CCY
{account_retirement} {retirement:.2f} CCY
Assets:CC:Federal:PreTax401k {retirement_neg:.2f} DEFCCY
Expenses:Taxes:Y{year}:CC:Federal:PreTax401k {retirement:.2f} DEFCCY
Income:CC:Employer1:Salary {gross_neg:.2f} CCY
Income:CC:Employer1:GroupTermLife {lifeinsurance_neg:.2f} CCY
Expenses:Health:Life:GroupTermLife {lifeinsurance:.2f} CCY
Expenses:Health:Dental:Insurance {dental} CCY
Expenses:Health:Medical:Insurance {medical} CCY
Expenses:Health:Vision:Insurance {vision} CCY
Expenses:Taxes:Y{year}:CC:Medicare {medicare:.2f} CCY
Expenses:Taxes:Y{year}:CC:Federal {federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {state:.2f} CCY
Expenses:Taxes:Y{year}:CC:CityNYC {city:.2f} CCY
Expenses:Taxes:Y{year}:CC:SDI {sdi:.2f} CCY
Expenses:Taxes:Y{year}:CC:SocSec {socsec:.2f} CCY
Assets:CC:Employer1:Vacation {vacation_hrs:.2f} VACHR
Income:CC:Employer1:Vacation {vacation_hrs_neg:.2f} VACHR
"""
if retirement == ZERO:
# Remove retirement lines.
template = '\n'.join(line
for line in template.splitlines()
if not re.search(r'\bretirement\b', line))
transactions.extend(parse(template, **locals()))
return preamble + transactions
beancount.scripts.example.generate_expense_accounts(date_birth)
Generate directives for expense accounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_expense_accounts(date_birth):
"""Generate directives for expense accounts.
Args:
date_birth: Birth date of the character.
Returns:
A list of directives.
"""
return parse("""
{date_birth} open Expenses:Food:Groceries
{date_birth} open Expenses:Food:Restaurant
{date_birth} open Expenses:Food:Coffee
{date_birth} open Expenses:Food:Alcohol
{date_birth} open Expenses:Transport:Tram
{date_birth} open Expenses:Home:Rent
{date_birth} open Expenses:Home:Electricity
{date_birth} open Expenses:Home:Internet
{date_birth} open Expenses:Home:Phone
{date_birth} open Expenses:Financial:Fees
{date_birth} open Expenses:Financial:Commissions
""", **locals())
beancount.scripts.example.generate_open_entries(date, accounts, currency=None)
Generate a list of Open entries for the given accounts:
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_open_entries(date, accounts, currency=None):
"""Generate a list of Open entries for the given accounts:
Args:
date: A datetime.date instance for the open entries.
accounts: A list of account strings.
currency: An optional currency constraint.
Returns:
A list of Open directives.
"""
assert isinstance(accounts, (list, tuple))
return parse(''.join(
'{date} open {account} {currency}\n'.format(date=date,
account=account,
currency=currency or '')
for account in accounts))
beancount.scripts.example.generate_outgoing_transfers(entries, account, account_out, transfer_minimum, transfer_threshold, transfer_increment)
Generate transfers of accumulated funds out of an account.
This monitors the balance of an account and when it is beyond a threshold, generate out transfers form that account to another account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_outgoing_transfers(entries,
account,
account_out,
transfer_minimum,
transfer_threshold,
transfer_increment):
"""Generate transfers of accumulated funds out of an account.
This monitors the balance of an account and when it is beyond a threshold,
generate out transfers form that account to another account.
Args:
entries: A list of existing entries that affect this account so far.
The generated entries will also affect this account.
account: An account string, the account to monitor.
account_out: An account string, the savings account to make transfers to.
transfer_minimum: The minimum amount of funds to always leave in this account
after a transfer.
transfer_threshold: The minimum amount of funds to be able to transfer out without
breaking the minimum.
transfer_increment: A Decimal, the increment to round transfers to.
Returns:
A list of new directives, the transfers to add to the given account.
"""
last_date = entries[-1].date
# Reverse the balance amounts taking into account the minimum balance for
# all time in the future.
amounts = [(balances[account].get_currency_units('CCY').number, txn_posting)
for txn_posting, balances in postings_for(entries, [account])]
reversed_amounts = []
last_amount, _ = amounts[-1]
for current_amount, _ in reversed(amounts):
if current_amount < last_amount:
reversed_amounts.append(current_amount)
last_amount = current_amount
else:
reversed_amounts.append(last_amount)
capped_amounts = reversed(reversed_amounts)
# Create transfers outward where the future allows it.
new_entries = []
offset_amount = ZERO
for current_amount, (_, txn_posting) in zip(capped_amounts, amounts):
if txn_posting.txn.date >= last_date:
break
adjusted_amount = current_amount - offset_amount
if adjusted_amount > (transfer_minimum + transfer_threshold):
amount_transfer = round_to(adjusted_amount - transfer_minimum,
transfer_increment)
date = txn_posting.txn.date + datetime.timedelta(days=1)
amount_transfer_neg = -amount_transfer
new_entries.extend(parse("""
{date} * "Transfering accumulated savings to other account"
{account} {amount_transfer_neg:2f} CCY
{account_out} {amount_transfer:2f} CCY
""", **locals()))
offset_amount += amount_transfer
return new_entries
beancount.scripts.example.generate_periodic_expenses(date_iter, payee, narration, account_from, account_to, amount_generator)
Generate periodic expense transactions.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_periodic_expenses(date_iter,
payee, narration,
account_from, account_to,
amount_generator):
"""Generate periodic expense transactions.
Args:
date_iter: An iterator for dates or datetimes.
payee: A string, the payee name to use on the transactions, or a set of such strings
to randomly choose from
narration: A string, the narration to use on the transactions.
account_from: An account string the debited account.
account_to: An account string the credited account.
amount_generator: A callable object to generate variates.
Returns:
A list of directives.
"""
new_entries = []
for dtime in date_iter:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
amount = D(amount_generator())
txn_payee = (payee
if isinstance(payee, str)
else random.choice(payee))
txn_narration = (narration
if isinstance(narration, str)
else random.choice(narration))
amount_neg = -amount
new_entries.extend(parse("""
{date} * "{txn_payee}" "{txn_narration}"
{account_from} {amount_neg:.2f} CCY
{account_to} {amount:.2f} CCY
""", **locals()))
return new_entries
beancount.scripts.example.generate_prices(date_begin, date_end, currencies, cost_currency)
Generate weekly or monthly price entries for the given currencies.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_prices(date_begin, date_end, currencies, cost_currency):
"""Generate weekly or monthly price entries for the given currencies.
Args:
date_begin: The start date.
date_end: The end date.
currencies: A list of currency strings to generate prices for.
cost_currency: A string, the cost currency.
Returns:
A list of Price directives.
"""
digits = D('0.01')
entries = []
counter = itertools.count()
for currency in currencies:
start_price = random.uniform(30, 200)
growth = random.uniform(0.02, 0.13) # %/year
mu = growth * (7 / 365)
sigma = random.uniform(0.005, 0.02) # Vol
for dtime, price_float in zip(rrule.rrule(rrule.WEEKLY, byweekday=rrule.FR,
dtstart=date_begin, until=date_end),
price_series(start_price, mu, sigma)):
price = D(price_float).quantize(digits)
meta = data.new_metadata(generate_prices.__name__, next(counter))
entry = data.Price(meta, dtime.date(), currency,
amount.Amount(price, cost_currency))
entries.append(entry)
return entries
beancount.scripts.example.generate_regular_credit_expenses(date_birth, date_begin, date_end, account_credit, account_checking)
Generate expenses paid out of a credit card account, including payments to the credit card.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_regular_credit_expenses(date_birth, date_begin, date_end,
account_credit,
account_checking):
"""Generate expenses paid out of a credit card account, including payments to the
credit card.
Args:
date_birth: The user's birth date.
date_begin: The start date.
date_end: The end date.
account_credit: The credit card account to generate expenses against.
account_checking: The checking account to generate payments from.
Returns:
A list of directives.
"""
restaurant_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 1, 5),
RESTAURANT_NAMES, RESTAURANT_NARRATIONS,
account_credit, 'Expenses:Food:Restaurant',
lambda: min(random.lognormvariate(math.log(30), math.log(1.5)),
random.randint(200, 220)))
groceries_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 5, 20),
GROCERIES_NAMES, "Buying groceries",
account_credit, 'Expenses:Food:Groceries',
lambda: min(random.lognormvariate(math.log(80), math.log(1.3)),
random.randint(250, 300)))
subway_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 27, 33),
"Metro Transport Authority", "Tram tickets",
account_credit, 'Expenses:Transport:Tram',
lambda: D('120.00'))
credit_expenses = data.sorted(restaurant_expenses +
groceries_expenses +
subway_expenses)
# Entries to open accounts.
credit_preamble = generate_open_entries(date_birth, [account_credit], 'CCY')
return data.sorted(credit_preamble + credit_expenses)
beancount.scripts.example.generate_retirement_employer_match(entries, account_invest, account_income)
Generate employer matching contributions into a retirement account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_retirement_employer_match(entries, account_invest, account_income):
"""Generate employer matching contributions into a retirement account.
Args:
entries: A list of directives that cover the retirement account.
account_invest: The name of the retirement cash account.
account_income: The name of the income account.
Returns:
A list of new entries generated for employer contributions.
"""
match_frac = D('0.50')
new_entries = parse("""
{date} open {account_income} CCY
""", date=entries[0].date, account_income=account_income)
for txn_posting, balances in postings_for(entries, [account_invest]):
amount = txn_posting.posting.units.number * match_frac
amount_neg = -amount
date = txn_posting.txn.date + ONE_DAY
new_entries.extend(parse("""
{date} * "Employer match for contribution"
{account_invest} {amount:.2f} CCY
{account_income} {amount_neg:.2f} CCY
""", **locals()))
return new_entries
beancount.scripts.example.generate_retirement_investments(entries, account, commodities_items, price_map)
Invest money deposited to the given retirement account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_retirement_investments(entries, account, commodities_items, price_map):
"""Invest money deposited to the given retirement account.
Args:
entries: A list of directives
account: The root account for all retirement investment sub-accounts.
commodities_items: A list of (commodity, fraction to be invested in) items.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
Returns:
A list of new directives for the given investments. This also generates account
opening directives for the desired investment commodities.
"""
open_entries = []
account_cash = join(account, 'Cash')
date_origin = entries[0].date
open_entries.extend(parse("""
{date_origin} open {account} CCY
institution: "Retirement_Institution"
address: "Retirement_Address"
phone: "Retirement_Phone"
{date_origin} open {account_cash} CCY
number: "882882"
""", **locals()))
for currency, _ in commodities_items:
open_entries.extend(parse("""
{date_origin} open {account}:{currency} {currency}
number: "882882"
""", **locals()))
new_entries = []
for txn_posting, balances in postings_for(entries, [account_cash]):
balance = balances[account_cash]
amount_to_invest = balance.get_currency_units('CCY').number
# Find the date the following Monday, the date to invest.
txn_date = txn_posting.txn.date
while txn_date.weekday() != calendar.MONDAY:
txn_date += ONE_DAY
amount_invested = ZERO
for commodity, fraction in commodities_items:
amount_fraction = amount_to_invest * D(fraction)
# Find the price at that date.
_, price = prices.get_price(price_map, (commodity, 'CCY'), txn_date)
units = (amount_fraction / price).quantize(D('0.001'))
amount_cash = (units * price).quantize(D('0.01'))
amount_cash_neg = -amount_cash
new_entries.extend(parse("""
{txn_date} * "Investing {fraction:.0%} of cash in {commodity}"
{account}:{commodity} {units:.3f} {commodity} {{{price:.2f} CCY}}
{account}:Cash {amount_cash_neg:.2f} CCY
""", **locals()))
balance.add_amount(amount.Amount(-amount_cash, 'CCY'))
return data.sorted(open_entries + new_entries)
beancount.scripts.example.generate_tax_accounts(year, date_max)
Generate accounts and contribution directives for a particular tax year.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_tax_accounts(year, date_max):
"""Generate accounts and contribution directives for a particular tax year.
Args:
year: An integer, the year we're to generate this for.
date_max: The maximum date to produce an entry for.
Returns:
A list of directives.
"""
date_year = datetime.date(year, 1, 1)
date_filing = (datetime.date(year + 1, 3, 20) +
datetime.timedelta(days=random.randint(0, 5)))
date_federal = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
date_state = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
quantum = D('0.01')
amount_federal = D(max(random.normalvariate(500, 120), 12)).quantize(quantum)
amount_federal_neg = -amount_federal
amount_state = D(max(random.normalvariate(300, 100), 10)).quantize(quantum)
amount_state_neg = -amount_state
amount_payable = -(amount_federal + amount_state)
amount_limit = RETIREMENT_LIMITS.get(year, RETIREMENT_LIMITS[None])
amount_limit_neg = -amount_limit
entries = parse("""
;; Open tax accounts for that year.
{date_year} open Expenses:Taxes:Y{year}:CC:Federal:PreTax401k DEFCCY
{date_year} open Expenses:Taxes:Y{year}:CC:Medicare CCY
{date_year} open Expenses:Taxes:Y{year}:CC:Federal CCY
{date_year} open Expenses:Taxes:Y{year}:CC:CityNYC CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SDI CCY
{date_year} open Expenses:Taxes:Y{year}:CC:State CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SocSec CCY
;; Check that the tax amounts have been fully used.
{date_year} balance Assets:CC:Federal:PreTax401k 0 DEFCCY
{date_year} * "Allowed contributions for one year"
Income:CC:Federal:PreTax401k {amount_limit_neg} DEFCCY
Assets:CC:Federal:PreTax401k {amount_limit} DEFCCY
{date_filing} * "Filing taxes for {year}"
Expenses:Taxes:Y{year}:CC:Federal {amount_federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {amount_state:.2f} CCY
Liabilities:AccountsPayable {amount_payable:.2f} CCY
{date_federal} * "FEDERAL TAXPYMT"
Assets:CC:Bank1:Checking {amount_federal_neg:.2f} CCY
Liabilities:AccountsPayable {amount_federal:.2f} CCY
{date_state} * "STATE TAX & FINANC PYMT"
Assets:CC:Bank1:Checking {amount_state_neg:.2f} CCY
Liabilities:AccountsPayable {amount_state:.2f} CCY
""", **locals())
return [entry for entry in entries if entry.date < date_max]
beancount.scripts.example.generate_tax_preamble(date_birth)
Generate tax declarations not specific to any particular year.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_tax_preamble(date_birth):
"""Generate tax declarations not specific to any particular year.
Args:
date_birth: A date instance, the birth date of the character.
Returns:
A list of directives.
"""
return parse("""
;; Tax accounts not specific to a year.
{date_birth} open Income:CC:Federal:PreTax401k DEFCCY
{date_birth} open Assets:CC:Federal:PreTax401k DEFCCY
""", **locals())
beancount.scripts.example.generate_taxable_investment(date_begin, date_end, entries, price_map, stocks)
Generate opening directives and transactions for an investment account.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_taxable_investment(date_begin, date_end, entries, price_map, stocks):
"""Generate opening directives and transactions for an investment account.
Args:
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
entries: A list of entries that contains at least the transfers to the investment
account's cash account.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
stocks: A list of strings, the list of commodities to invest in.
Returns:
A list of directives.
"""
account = 'Assets:CC:Investment'
account_cash = join(account, 'Cash')
account_gains = 'Income:CC:Investment:Gains'
account_dividends = 'Income:CC:Investment:Dividends'
accounts_stocks = ['Assets:CC:Investment:{}'.format(commodity)
for commodity in stocks]
open_entries = parse("""
{date_begin} open {account}:Cash CCY
{date_begin} open {account_gains} CCY
{date_begin} open {account_dividends} CCY
""", **locals())
for stock in stocks:
open_entries.extend(parse("""
{date_begin} open {account}:{stock} {stock}
""", **locals()))
# Figure out dates at which dividends should be distributed, near the end of
# each quarter.
days_to = datetime.timedelta(days=3*90-10)
dividend_dates = []
for quarter_begin in iter_quarters(date_begin, date_end):
end_of_quarter = quarter_begin + days_to
if not (date_begin < end_of_quarter < date_end):
continue
dividend_dates.append(end_of_quarter)
# Iterate over all the dates, but merging in the postings for the cash
# account.
min_amount = D('1000.00')
round_amount = D('100.00')
commission = D('8.95')
round_units = D('1')
frac_invest = D('1.00')
frac_dividend = D('0.004')
p_daily_buy = 1./15 # days
p_daily_sell = 1./90 # days
stocks_inventory = inventory.Inventory()
new_entries = []
dividend_date_iter = iter(dividend_dates)
next_dividend_date = next(dividend_date_iter)
for date, balances in iter_dates_with_balance(date_begin, date_end,
entries, [account_cash]):
# Check if we should insert a dividend. Note that we could not factor
# this out because we want to explicitly reinvest the cash dividends and
# we also want the dividends to be proportional to the amount of
# invested stock, so one feeds on the other and vice-versa.
if next_dividend_date and date > next_dividend_date:
# Compute the total balances for the stock accounts in order to
# create a realistic dividend.
total = inventory.Inventory()
for account_stock in accounts_stocks:
total.add_inventory(balances[account_stock])
# Create an entry offering dividends of 1% of the portfolio.
portfolio_cost = total.reduce(convert.get_cost).get_currency_units('CCY').number
amount_cash = (frac_dividend * portfolio_cost).quantize(D('0.01'))
amount_cash_neg = -amount_cash
dividend = parse("""
{next_dividend_date} * "Dividends on portfolio"
{account}:Cash {amount_cash:.2f} CCY
{account_dividends} {amount_cash_neg:.2f} CCY
""", **locals())[0]
new_entries.append(dividend)
# Advance the next dividend date.
try:
next_dividend_date = next(dividend_date_iter)
except StopIteration:
next_dividend_date = None
# If the balance is high, buy with high probability.
balance = balances[account_cash]
total_cash = balance.get_currency_units('CCY').number
assert total_cash >= ZERO, ('Cash balance is negative: {}'.format(total_cash))
invest_cash = total_cash * frac_invest - commission
if invest_cash > min_amount:
if random.random() < p_daily_buy:
commodities = random.sample(stocks, random.randint(1, len(stocks)))
lot_amount = round_to(invest_cash / len(commodities), round_amount)
invested_amount = ZERO
for stock in commodities:
# Find the price at that date.
_, price = prices.get_price(price_map, (stock, 'CCY'), date)
units = round_to((lot_amount / price), round_units)
if units <= ZERO:
continue
amount_cash = -(units * price + commission)
# logging.info('Buying %s %s @ %s CCY = %s CCY',
# units, stock, price, units * price)
buy = parse("""
{date} * "Buy shares of {stock}"
{account}:Cash {amount_cash:.2f} CCY
{account}:{stock} {units:.0f} {stock} {{{price:.2f} CCY}}
Expenses:Financial:Commissions {commission:.2f} CCY
""", **locals())[0]
new_entries.append(buy)
account_stock = ':'.join([account, stock])
balances[account_cash].add_position(buy.postings[0])
balances[account_stock].add_position(buy.postings[1])
stocks_inventory.add_position(buy.postings[1])
# Don't sell on days you buy.
continue
# Otherwise, sell with low probability.
if not stocks_inventory.is_empty() and random.random() < p_daily_sell:
# Choose the lot with the highest gain or highest loss.
gains = []
for position in stocks_inventory.get_positions():
base_quote = (position.units.currency, position.cost.currency)
_, price = prices.get_price(price_map, base_quote, date)
if price == position.cost.number:
continue # Skip lots without movement.
market_value = position.units.number * price
book_value = convert.get_cost(position).number
gain = market_value - book_value
gains.append((gain, market_value, price, position))
if not gains:
continue
# Sell either biggest winner or biggest loser.
biggest = bool(random.random() < 0.5)
lot_tuple = sorted(gains)[0 if biggest else -1]
gain, market_value, price, sell_position = lot_tuple
#logging.info('Selling {} for {}'.format(sell_position, market_value))
sell_position = -sell_position
stock = sell_position.units.currency
amount_cash = market_value - commission
amount_gain = -gain
sell = parse("""
{date} * "Sell shares of {stock}"
{account}:{stock} {sell_position} @ {price:.2f} CCY
{account}:Cash {amount_cash:.2f} CCY
Expenses:Financial:Commissions {commission:.2f} CCY
{account_gains} {amount_gain:.2f} CCY
""", **locals())[0]
new_entries.append(sell)
balances[account_cash].add_position(sell.postings[1])
stocks_inventory.add_position(sell.postings[0])
continue
return open_entries + new_entries
beancount.scripts.example.generate_trip_entries(date_begin, date_end, tag, config, trip_city, home_city, account_credit)
Generate more dense expenses for a trip.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def generate_trip_entries(date_begin, date_end,
tag, config,
trip_city, home_city,
account_credit):
"""Generate more dense expenses for a trip.
Args:
date_begin: A datetime.date instance, the beginning of the trip.
date_end: A datetime.date instance, the end of the trip.
tag: A string, the name of the tag.
config: A list of (payee name, account name, (mu, 3sigma)), where
mu is the mean of the prices to generate and 3sigma is 3 times
the standard deviation.
trip_city: A string, the capitalized name of the destination city.
home_city: A string, the name of the home city.
account_credit: A string, the name of the credit card account to pay
the expenses from.
Returns:
A list of entries for the trip, all tagged with the given tag.
"""
p_day_generate = 0.3
new_entries = []
for date in date_iter(date_begin, date_end):
for payee, account_expense, (mu, sigma3) in config:
if random.random() < p_day_generate:
amount = random.normalvariate(mu, sigma3 / 3.)
amount_neg = -amount
new_entries.extend(parse("""
{date} * "{payee}" "" #{tag}
{account_credit} {amount_neg:.2f} CCY
{account_expense} {amount:.2f} CCY
""", **locals()))
# Consume the vacation days.
vacation_hrs = (date_end - date_begin).days * 8 # hrs/day
new_entries.extend(parse("""
{date_end} * "Consume vacation days"
Assets:CC:Employer1:Vacation -{vacation_hrs:.2f} VACHR
Expenses:Vacation {vacation_hrs:.2f} VACHR
""", **locals()))
# Generate events for the trip.
new_entries.extend(parse("""
{date_begin} event "location" "{trip_city}"
{date_end} event "location" "{home_city}"
""", **locals()))
return new_entries
beancount.scripts.example.get_minimum_balance(entries, account, currency)
Compute the minimum balance of the given account according to the entries history.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def get_minimum_balance(entries, account, currency):
"""Compute the minimum balance of the given account according to the entries history.
Args:
entries: A list of directives.
account: An account string.
currency: A currency string, for which we want to compute the minimum.
Returns:
A Decimal number, the minimum amount throughout the history of this account.
"""
min_amount = ZERO
for _, balances in postings_for(entries, [account]):
balance = balances[account]
current = balance.get_currency_units(currency).number
if current < min_amount:
min_amount = current
return min_amount
beancount.scripts.example.iter_dates_with_balance(date_begin, date_end, entries, accounts)
Iterate over dates, including the balances of the postings iterator.
Parameters: |
|
---|
Yields: Pairs of (data, balances) objects, with date: A datetime.date instance balances: An Inventory object, representing the current balance. You can modify the inventory object to feed back changes in the balance.
Source code in beancount/scripts/example.py
def iter_dates_with_balance(date_begin, date_end, entries, accounts):
"""Iterate over dates, including the balances of the postings iterator.
Args:
postings_iter: An iterator of postings as per postings_for().
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of (data, balances) objects, with
date: A datetime.date instance
balances: An Inventory object, representing the current balance.
You can modify the inventory object to feed back changes in the
balance.
"""
balances = collections.defaultdict(inventory.Inventory)
merged_txn_postings = iter(merge_postings(entries, accounts))
txn_posting = next(merged_txn_postings, None)
for date in date_iter(date_begin, date_end):
while txn_posting and txn_posting.txn.date == date:
posting = txn_posting.posting
balances[posting.account].add_position(posting)
txn_posting = next(merged_txn_postings, None)
yield date, balances
beancount.scripts.example.iter_quarters(date_begin, date_end)
Iterate over all quarters between begin and end dates.
Parameters: |
|
---|
Yields: Instances of datetime.date at the beginning of the quarters. This will include the quarter of the beginning date and of the end date.
Source code in beancount/scripts/example.py
def iter_quarters(date_begin, date_end):
"""Iterate over all quarters between begin and end dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date at the beginning of the quarters. This will
include the quarter of the beginning date and of the end date.
"""
quarter = (date_begin.year, (date_begin.month-1)//3)
quarter_last = (date_end.year, (date_end.month-1)//3)
assert quarter <= quarter_last
while True:
year, trimester = quarter
yield datetime.date(year, trimester*3 + 1, 1)
if quarter == quarter_last:
break
trimester = (trimester + 1) % 4
if trimester == 0:
year += 1
quarter = (year, trimester)
beancount.scripts.example.merge_postings(entries, accounts)
Merge all the postings from the given account names.
Parameters: |
|
---|
Yields: A list of TxnPosting's for all the accounts, in sorted order.
Source code in beancount/scripts/example.py
def merge_postings(entries, accounts):
"""Merge all the postings from the given account names.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
Yields:
A list of TxnPosting's for all the accounts, in sorted order.
"""
real_root = realization.realize(entries)
merged_postings = []
for account in accounts:
real_account = realization.get(real_root, account)
if real_account is None:
continue
merged_postings.extend(txn_posting
for txn_posting in real_account.txn_postings
if isinstance(txn_posting, data.TxnPosting))
merged_postings.sort(key=lambda txn_posting: txn_posting.txn.date)
return merged_postings
beancount.scripts.example.parse(input_string, **replacements)
Parse some input string and assert no errors.
This parse function does not just create the object, it also triggers local interpolation to fill in the missing amounts.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def parse(input_string, **replacements):
"""Parse some input string and assert no errors.
This parse function does not just create the object, it also triggers local
interpolation to fill in the missing amounts.
Args:
input_string: Beancount input text.
**replacements: A dict of keywords to replace to their values.
Returns:
A list of directive objects.
"""
if replacements:
class IgnoreFormatter(string.Formatter):
def check_unused_args(self, used_args, args, kwargs):
pass
formatter = IgnoreFormatter()
formatted_string = formatter.format(input_string, **replacements)
else:
formatted_string = input_string
entries, errors, options_map = parser.parse_string(textwrap.dedent(formatted_string))
if errors:
printer.print_errors(errors, file=sys.stderr)
raise ValueError("Parsed text has errors")
# Interpolation.
entries, unused_balance_errors = booking.book(entries, options_map)
return data.sorted(entries)
beancount.scripts.example.postings_for(entries, accounts, before=False)
Realize the entries and get the list of postings for the given accounts.
All the non-Posting directives are already filtered out.
Parameters: |
|
---|
Yields: Tuples of: posting: An instance of TxnPosting balances: A dict of Inventory balances for the given accounts after applying the posting. These inventory objects can be mutated to adjust the balance due to generated transactions to be applied later.
Source code in beancount/scripts/example.py
def postings_for(entries, accounts, before=False):
"""Realize the entries and get the list of postings for the given accounts.
All the non-Posting directives are already filtered out.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
before: A boolean, if true, yield the balance before the position is applied.
The default is to yield the balance after applying the position.
Yields:
Tuples of:
posting: An instance of TxnPosting
balances: A dict of Inventory balances for the given accounts _after_
applying the posting. These inventory objects can be mutated to adjust
the balance due to generated transactions to be applied later.
"""
assert isinstance(accounts, list)
merged_txn_postings = merge_postings(entries, accounts)
balances = collections.defaultdict(inventory.Inventory)
for txn_posting in merged_txn_postings:
if before:
yield txn_posting, balances
posting = txn_posting.posting
balances[posting.account].add_position(posting)
if not before:
yield txn_posting, balances
beancount.scripts.example.price_series(start, mu, sigma)
Generate a price series based on a simple stochastic model.
Parameters: |
|
---|
Yields: Floats, at each step.
Source code in beancount/scripts/example.py
def price_series(start, mu, sigma):
"""Generate a price series based on a simple stochastic model.
Args:
start: The beginning value.
mu: The per-step drift, in units of value.
sigma: Volatility of the changes.
Yields:
Floats, at each step.
"""
value = start
while 1:
yield value
value += random.normalvariate(mu, sigma) * value
beancount.scripts.example.replace(string, replacements, strip=False)
Apply word-boundaried regular expression replacements to an indented string.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/example.py
def replace(string, replacements, strip=False):
"""Apply word-boundaried regular expression replacements to an indented string.
Args:
string: Some input template string.
replacements: A dict of regexp to replacement value.
strip: A boolean, true if we should strip the input.
Returns:
The input string with the replacements applied to it, with the indentation removed.
"""
output = textwrap.dedent(string)
if strip:
output = output.strip()
for from_, to_ in replacements.items():
if not isinstance(to_, str) and not callable(to_):
to_ = str(to_)
output = re.sub(r'\b{}\b'.format(from_), to_, output)
return output
beancount.scripts.example.validate_output(contents, positive_accounts, currency)
Check that the output file validates.
Parameters: |
|
---|
Exceptions: |
|
---|
Source code in beancount/scripts/example.py
def validate_output(contents, positive_accounts, currency):
"""Check that the output file validates.
Args:
contents: A string, the output file.
positive_accounts: A list of strings, account names to check for
non-negative balances.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: If the output does not validate.
"""
loaded_entries, _, _ = loader.load_string(
contents,
log_errors=sys.stderr,
extra_validations=validation.HARDCORE_VALIDATIONS)
# Sanity checks: Check that the checking balance never goes below zero.
for account in positive_accounts:
check_non_negative(loaded_entries, account, currency)
beancount.scripts.example.write_example_file(date_birth, date_begin, date_end, reformat, file)
Generate the example file.
Parameters: |
|
---|
Source code in beancount/scripts/example.py
def write_example_file(date_birth, date_begin, date_end, reformat, file):
"""Generate the example file.
Args:
date_birth: A datetime.date instance, the birth date of our character.
date_begin: A datetime.date instance, the beginning date at which to generate
transactions.
date_end: A datetime.date instance, the end date at which to generate
transactions.
reformat: A boolean, true if we should apply global reformatting to this file.
file: A file object, where to write out the output.
"""
# The following code entirely writes out the output to generic names, such
# as "Employer1", "Bank1", and "CCY" (for principal currency). Those names
# are purposely chosen to be unique, and only near the very end do we make
# renamings to more specific and realistic names.
# Name of the checking account.
account_opening = 'Equity:Opening-Balances'
account_payable = 'Liabilities:AccountsPayable'
account_checking = 'Assets:CC:Bank1:Checking'
account_credit = 'Liabilities:CC:CreditCard1'
account_retirement = 'Assets:CC:Retirement'
account_investing = 'Assets:CC:Investment:Cash'
# Commodities.
commodity_entries = generate_commodity_entries(date_birth)
# Estimate the rent.
rent_amount = round_to(ANNUAL_SALARY / RENT_DIVISOR, RENT_INCREMENT)
# Get a random employer.
employer_name, employer_address = random.choice(EMPLOYERS)
logging.info("Generating Salary Employment Income")
income_entries = generate_employment_income(employer_name, employer_address,
ANNUAL_SALARY,
account_checking,
join(account_retirement, 'Cash'),
date_begin, date_end)
logging.info("Generating Expenses from Banking Accounts")
banking_expenses = generate_banking_expenses(date_begin, date_end,
account_checking, rent_amount)
logging.info("Generating Regular Expenses via Credit Card")
credit_regular_entries = generate_regular_credit_expenses(
date_birth, date_begin, date_end, account_credit, account_checking)
logging.info("Generating Credit Card Expenses for Trips")
trip_entries = []
destinations = sorted(TRIP_DESTINATIONS.items())
destinations.extend(destinations)
random.shuffle(destinations)
for (date_trip_begin, date_trip_end), (destination_name, config) in zip(
compute_trip_dates(date_begin, date_end), destinations):
# Compute a suitable tag.
tag = 'trip-{}-{}'.format(destination_name.lower().replace(' ', '-'),
date_trip_begin.year)
#logging.info("%s -- %s %s", tag, date_trip_begin, date_trip_end)
# Remove regular entries during this trip.
credit_regular_entries = [entry
for entry in credit_regular_entries
if not(date_trip_begin <= entry.date < date_trip_end)]
# Generate entries for the trip.
this_trip_entries = generate_trip_entries(
date_trip_begin, date_trip_end,
tag, config,
destination_name.replace('-', ' ').title(), HOME_NAME,
account_credit)
trip_entries.extend(this_trip_entries)
logging.info("Generating Credit Card Payment Entries")
credit_payments = generate_clearing_entries(
delay_dates(rrule.rrule(rrule.MONTHLY,
dtstart=date_begin, until=date_end, bymonthday=7), 0, 4),
"CreditCard1", "Paying off credit card",
credit_regular_entries,
account_credit, account_checking)
credit_entries = credit_regular_entries + trip_entries + credit_payments
logging.info("Generating Tax Filings and Payments")
tax_preamble = generate_tax_preamble(date_birth)
# Figure out all the years we need tax accounts for.
years = set()
for account_name in getters.get_accounts(income_entries):
match = re.match(r'Expenses:Taxes:Y(\d\d\d\d)', account_name)
if match:
years.add(int(match.group(1)))
taxes = [(year, generate_tax_accounts(year, date_end)) for year in sorted(years)]
tax_entries = tax_preamble + functools.reduce(operator.add,
(entries
for _, entries in taxes))
logging.info("Generating Opening of Banking Accounts")
# Open banking accounts and gift the checking account with a balance that
# will offset all the amounts to ensure a positive balance throughout its
# lifetime.
entries_for_banking = data.sorted(income_entries +
banking_expenses +
credit_entries +
tax_entries)
minimum = get_minimum_balance(entries_for_banking,
account_checking, 'CCY')
banking_entries = generate_banking(entries_for_banking,
date_begin, date_end,
max(-minimum, ZERO))
logging.info("Generating Transfers to Investment Account")
banking_transfers = generate_outgoing_transfers(
data.sorted(income_entries +
banking_entries +
banking_expenses +
credit_entries +
tax_entries),
account_checking,
account_investing,
transfer_minimum=D('200'),
transfer_threshold=D('3000'),
transfer_increment=D('500'))
logging.info("Generating Prices")
# Generate price entries for investment currencies and create a price map to
# use for later for generating investment transactions.
funds_allocation = {'MFUND1': 0.40, 'MFUND2': 0.60}
stocks = ['STK1', 'STK2', 'STK3', 'STK4']
price_entries = generate_prices(date_begin, date_end,
sorted(funds_allocation.keys()) + stocks, 'CCY')
price_map = prices.build_price_map(price_entries)
logging.info("Generating Employer Match Contribution")
account_match = 'Income:US:Employer1:Match401k'
retirement_match = generate_retirement_employer_match(income_entries,
join(account_retirement, 'Cash'),
account_match)
logging.info("Generating Retirement Investments")
retirement_entries = generate_retirement_investments(
income_entries + retirement_match, account_retirement,
sorted(funds_allocation.items()),
price_map)
logging.info("Generating Taxes Investments")
investment_entries = generate_taxable_investment(date_begin, date_end,
banking_transfers, price_map,
stocks)
logging.info("Generating Expense Accounts")
expense_accounts_entries = generate_expense_accounts(date_birth)
logging.info("Generating Equity Accounts")
equity_entries = generate_open_entries(date_birth, [account_opening,
account_payable])
logging.info("Generating Balance Checks")
credit_checks = generate_balance_checks(credit_entries, account_credit,
date_random_seq(date_begin, date_end, 20, 30))
banking_checks = generate_balance_checks(data.sorted(income_entries +
banking_entries +
banking_expenses +
banking_transfers +
credit_entries +
tax_entries),
account_checking,
date_random_seq(date_begin, date_end, 20, 30))
logging.info("Outputting and Formatting Entries")
dcontext = display_context.DisplayContext()
default_int_digits = 8
for currency, precision in {'USD': 2,
'CAD': 2,
'VACHR':0,
'IRAUSD': 2,
'VBMPX': 3,
'RGAGX': 3,
'ITOT': 0,
'VEA': 0,
'VHT': 0,
'GLD': 0}.items():
int_digits = default_int_digits
if precision > 0:
int_digits += 1 + precision
dcontext.update(D('{{:0{}.{}f}}'.format(int_digits, precision).format(0)), currency)
output = io.StringIO()
def output_section(title, entries):
output.write('\n\n\n{}\n\n'.format(title))
printer.print_entries(data.sorted(entries), dcontext, file=output)
output.write(FILE_PREAMBLE.format(**locals()))
output_section('* Commodities', commodity_entries)
output_section('* Equity Accounts', equity_entries)
output_section('* Banking', data.sorted(banking_entries +
banking_expenses +
banking_transfers +
banking_checks))
output_section('* Credit-Cards', data.sorted(credit_entries +
credit_checks))
output_section('* Taxable Investments', investment_entries)
output_section('* Retirement Investments', data.sorted(retirement_entries +
retirement_match))
output_section('* Sources of Income', income_entries)
output_section('* Taxes', tax_preamble)
for year, entries in taxes:
output_section('** Tax Year {}'.format(year), entries)
output_section('* Expenses', expense_accounts_entries)
output_section('* Prices', price_entries)
output_section('* Cash', [])
logging.info("Contextualizing to Realistic Names")
contents, replacements = contextualize_file(output.getvalue(), employer_name)
if reformat:
contents = format.align_beancount(contents)
logging.info("Writing contents")
file.write(contents)
logging.info("Validating Results")
validate_output(contents,
[replace(account, replacements)
for account in [account_checking]],
replace('CCY', replacements))
beancount.scripts.format
Align a beancount/ledger input file's numbers.
This reformats at beancount or ledger input file so that the amounts in the postings are all aligned to the same column. The currency should match.
Note: this does not parse the Beancount ledger. It simply uses regular expressions and text manipulations to do its work.
beancount.scripts.format.align_beancount(contents, prefix_width=None, num_width=None, currency_column=None)
Reformat Beancount input to align all the numbers at the same column.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def align_beancount(contents, prefix_width=None, num_width=None, currency_column=None):
"""Reformat Beancount input to align all the numbers at the same column.
Args:
contents: A string, Beancount input syntax to reformat.
prefix_width: An integer, the width in characters to render the account
name to. If this is not specified, a good value is selected
automatically from the contents of the file.
num_width: An integer, the width to render each number. If this is not
specified, a good value is selected automatically from the contents of
the file.
currency_column: An integer, the column at which to align the currencies.
If given, this overrides the other options.
Returns:
A string, reformatted Beancount input with all the number aligned.
No other changes than whitespace changes should be present between that
return value and the input contents.
"""
# Find all lines that have a number in them and calculate the maximum length
# of the stripped prefix and the number.
match_pairs = []
for line in contents.splitlines():
match = re.match(
r'([^";]*?)\s+([-+]?\s*[\d,]+(?:\.\d*)?)\s+({}\b.*)'.format(amount.CURRENCY_RE),
line)
if match:
prefix, number, rest = match.groups()
match_pairs.append((prefix, number, rest))
else:
match_pairs.append((line, None, None))
# Normalize whitespace before lines that has some indent and an account
# name.
norm_match_pairs = normalize_indent_whitespace(match_pairs)
if currency_column:
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
num_of_spaces = currency_column - len(prefix) - len(number) - 4
spaces = ' ' * num_of_spaces
output.write(prefix + spaces + ' ' + number + ' ' + rest)
output.write('\n')
return output.getvalue()
# Compute the maximum widths.
filtered_pairs = [(prefix, number)
for prefix, number, _ in match_pairs
if number is not None]
if filtered_pairs:
max_prefix_width = max(len(prefix) for prefix, _ in filtered_pairs)
max_num_width = max(len(number) for _, number in filtered_pairs)
else:
max_prefix_width = 0
max_num_width = 0
# Use user-supplied overrides, if available
if prefix_width:
max_prefix_width = prefix_width
if num_width:
max_num_width = num_width
# Create a format that will admit the maximum width of all prefixes equally.
line_format = '{{:<{prefix_width}}} {{:>{num_width}}} {{}}'.format(
prefix_width=max_prefix_width,
num_width=max_num_width)
# Process each line to an output buffer.
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
output.write(line_format.format(prefix.rstrip(), number, rest))
output.write('\n')
formatted_contents = output.getvalue()
# Ensure that the file before and after have only whitespace differences.
# This is a sanity check, to make really sure we never change anything but whitespace,
# so it's safe.
# open('/tmp/before', 'w').write(re.sub(r'[ \t]+', ' ', contents))
# open('/tmp/after', 'w').write(re.sub(r'[ \t]+', ' ', formatted_contents))
old_stripped = re.sub(r'[ \t\n]+', ' ', contents.rstrip())
new_stripped = re.sub(r'[ \t\n]+', ' ', formatted_contents.rstrip())
assert (old_stripped == new_stripped), (old_stripped, new_stripped)
return formatted_contents
beancount.scripts.format.compute_most_frequent(iterable)
Compute the frequencies of the given elements and return the most frequent.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def compute_most_frequent(iterable):
"""Compute the frequencies of the given elements and return the most frequent.
Args:
iterable: A collection of hashable elements.
Returns:
The most frequent element. If there are no elements in the iterable,
return None.
"""
frequencies = collections.Counter(iterable)
if not frequencies:
return None
counts = sorted((count, element)
for element, count in frequencies.items())
# Note: In case of a tie, this chooses the longest width.
# We could eventually make this an option.
return counts[-1][1]
beancount.scripts.format.normalize_indent_whitespace(match_pairs)
Normalize whitespace before lines that has some indent and an account name.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/format.py
def normalize_indent_whitespace(match_pairs):
"""Normalize whitespace before lines that has some indent and an account name.
Args:
match_pairs: A list of (prefix, number, rest) tuples.
Returns:
Another list of (prefix, number, rest) tuples, where prefix may have been
adjusted with a different whitespace prefix.
"""
# Compute most frequent account name prefix.
match_posting = re.compile(r'([ \t]+)({}.*)'.format(account.ACCOUNT_RE)).match
width = compute_most_frequent(
len(match.group(1))
for match in (match_posting(prefix)
for prefix, _, _ in match_pairs)
if match is not None)
norm_format = ' ' * (width or 0) + '{}'
# Make the necessary adjustments.
adjusted_pairs = []
for tup in match_pairs:
prefix, number, rest = tup
match = match_posting(prefix)
if match is not None:
tup = (norm_format.format(match.group(2)), number, rest)
adjusted_pairs.append(tup)
return adjusted_pairs
beancount.scripts.sql
Convert a Beancount ledger into an SQL database.
beancount.scripts.sql.BalanceWriter (DirectiveWriter)
beancount.scripts.sql.BalanceWriter.type (tuple)
Balance(meta, date, account, amount, tolerance, diff_amount)
beancount.scripts.sql.BalanceWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.BalanceWriter.type.__new__(_cls, meta, date, account, amount, tolerance, diff_amount)
special
staticmethod
Create new instance of Balance(meta, date, account, amount, tolerance, diff_amount)
beancount.scripts.sql.BalanceWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.BalanceWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account,
entry.amount.number,
entry.amount.currency,
entry.diff_amount.currency if entry.diff_amount else None,
entry.diff_amount.currency if entry.diff_amount else None)
beancount.scripts.sql.CloseWriter (DirectiveWriter)
beancount.scripts.sql.CloseWriter.type (tuple)
Close(meta, date, account)
beancount.scripts.sql.CloseWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.CloseWriter.type.__new__(_cls, meta, date, account)
special
staticmethod
Create new instance of Close(meta, date, account)
beancount.scripts.sql.CloseWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.CloseWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account,)
beancount.scripts.sql.DirectiveWriter
A base class for writers of directives. This is used to factor out code for all the simple directives types (all types except Transaction).
beancount.scripts.sql.DirectiveWriter.__call__(self, connection, entries)
special
Create a table for a directives.
Parameters: |
|
---|
Source code in beancount/scripts/sql.py
def __call__(self, connection, entries):
"""Create a table for a directives.
Args:
connection: A DBAPI-2.0 Connection object.
entries: A list of directives.
"""
with connection:
columns_text = ','.join(self.columns.strip().splitlines())
connection.execute("""
CREATE TABLE {name}_detail (
id INTEGER PRIMARY KEY,
{columns}
);
""".format(name=self.name,
columns=columns_text))
connection.execute("""
CREATE VIEW {name} AS
SELECT * FROM entry JOIN {name}_detail USING (id);
""".format(name=self.name))
with connection:
for eid, entry in enumerate(entries):
if not isinstance(entry, self.type):
continue
# Store common data.
connection.execute("""
INSERT INTO entry VALUES (?, ?, ?, ?, ?);
""", (eid, entry.date, self.name,
entry.meta["filename"], entry.meta["lineno"]))
# Store detail data.
detail_data = self.get_detail(entry)
row_data = (eid,) + detail_data
query = """
INSERT INTO {name}_detail VALUES ({placeholder});
""".format(name=self.name,
placeholder=','.join(['?'] * (1 + len(detail_data))))
connection.execute(query, row_data)
beancount.scripts.sql.DirectiveWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
"""Provide data to store for details table.
Args:
entry: An instance of the desired directive.
Returns:
A tuple of the values corresponding to the columns declared in the
'columns' attribute.
"""
raise NotImplementedError
beancount.scripts.sql.DocumentWriter (DirectiveWriter)
beancount.scripts.sql.DocumentWriter.type (tuple)
Document(meta, date, account, filename, tags, links)
beancount.scripts.sql.DocumentWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.DocumentWriter.type.__new__(_cls, meta, date, account, filename, tags, links)
special
staticmethod
Create new instance of Document(meta, date, account, filename, tags, links)
beancount.scripts.sql.DocumentWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.DocumentWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account,
entry.filename)
beancount.scripts.sql.EventWriter (DirectiveWriter)
beancount.scripts.sql.EventWriter.type (tuple)
Event(meta, date, type, description)
beancount.scripts.sql.EventWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.EventWriter.type.__new__(_cls, meta, date, type, description)
special
staticmethod
Create new instance of Event(meta, date, type, description)
beancount.scripts.sql.EventWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.EventWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.type,
entry.description)
beancount.scripts.sql.NoteWriter (DirectiveWriter)
beancount.scripts.sql.NoteWriter.type (tuple)
Note(meta, date, account, comment)
beancount.scripts.sql.NoteWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.NoteWriter.type.__new__(_cls, meta, date, account, comment)
special
staticmethod
Create new instance of Note(meta, date, account, comment)
beancount.scripts.sql.NoteWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.NoteWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account,
entry.comment)
beancount.scripts.sql.OpenWriter (DirectiveWriter)
beancount.scripts.sql.OpenWriter.type (tuple)
Open(meta, date, account, currencies, booking)
beancount.scripts.sql.OpenWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.OpenWriter.type.__new__(_cls, meta, date, account, currencies, booking)
special
staticmethod
Create new instance of Open(meta, date, account, currencies, booking)
beancount.scripts.sql.OpenWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.OpenWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account,
','.join(entry.currencies or []))
beancount.scripts.sql.PadWriter (DirectiveWriter)
beancount.scripts.sql.PadWriter.type (tuple)
Pad(meta, date, account, source_account)
beancount.scripts.sql.PadWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.PadWriter.type.__new__(_cls, meta, date, account, source_account)
special
staticmethod
Create new instance of Pad(meta, date, account, source_account)
beancount.scripts.sql.PadWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.PadWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.account, entry.source_account)
beancount.scripts.sql.PriceWriter (DirectiveWriter)
beancount.scripts.sql.PriceWriter.type (tuple)
Price(meta, date, currency, amount)
beancount.scripts.sql.PriceWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.PriceWriter.type.__new__(_cls, meta, date, currency, amount)
special
staticmethod
Create new instance of Price(meta, date, currency, amount)
beancount.scripts.sql.PriceWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.PriceWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.currency,
entry.amount.number,
entry.amount.currency)
beancount.scripts.sql.QueryWriter (DirectiveWriter)
beancount.scripts.sql.QueryWriter.type (tuple)
Query(meta, date, name, query_string)
beancount.scripts.sql.QueryWriter.type.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/scripts/sql.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.sql.QueryWriter.type.__new__(_cls, meta, date, name, query_string)
special
staticmethod
Create new instance of Query(meta, date, name, query_string)
beancount.scripts.sql.QueryWriter.type.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/scripts/sql.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.sql.QueryWriter.get_detail(self, entry)
Provide data to store for details table.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def get_detail(self, entry):
return (entry.name,
entry.query_string)
beancount.scripts.sql.adapt_decimal(number)
Adapt a Decimal instance to a string for creating queries.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def adapt_decimal(number):
"""Adapt a Decimal instance to a string for creating queries.
Args:
number: An instance of Decimal.
Returns:
A string.
"""
return str(number)
beancount.scripts.sql.convert_decimal(string)
Convert a Decimal string to a Decimal instance.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/scripts/sql.py
def convert_decimal(string):
"""Convert a Decimal string to a Decimal instance.
Args:
string: A decimal number in a string.
Returns:
An instance of Decimal.
"""
return Decimal(string)
beancount.scripts.sql.output_common(connection, unused_entries)
Create a table of common data for all entries.
Parameters: |
|
---|
Source code in beancount/scripts/sql.py
def output_common(connection, unused_entries):
"""Create a table of common data for all entries.
Args:
connection: A DBAPI-2.0 Connection object.
entries: A list of directives.
"""
with connection:
connection.execute("""
CREATE TABLE entry (
id INTEGER PRIMARY KEY,
date DATE,
type CHARACTER(8),
source_filename STRING,
source_lineno INTEGER
);
""")
beancount.scripts.sql.output_transactions(connection, entries)
Create a table for transactions and fill in the data.
Parameters: |
|
---|
Source code in beancount/scripts/sql.py
def output_transactions(connection, entries):
"""Create a table for transactions and fill in the data.
Args:
connection: A DBAPI-2.0 Connection object.
entries: A list of directives.
"""
with connection:
connection.execute("""
CREATE TABLE transactions_detail (
id INTEGER PRIMARY KEY,
flag CHARACTER(1),
payee VARCHAR,
narration VARCHAR,
tags VARCHAR, -- Comma-separated
links VARCHAR -- Comma-separated
);
""")
connection.execute("""
CREATE VIEW transactions AS
SELECT * FROM entry JOIN transactions_detail USING (id);
""")
connection.execute("""
CREATE TABLE postings (
posting_id INTEGER PRIMARY KEY,
id INTEGER,
flag CHARACTER(1),
account VARCHAR,
number DECIMAL(16, 6),
currency CHARACTER(10),
cost_number DECIMAL(16, 6),
cost_currency CHARACTER(10),
cost_date DATE,
cost_label VARCHAR,
price_number DECIMAL(16, 6),
price_currency CHARACTER(10),
FOREIGN KEY(id) REFERENCES entries(id)
);
""")
postings_count = iter(itertools.count())
with connection:
for eid, entry in enumerate(entries):
if not isinstance(entry, data.Transaction):
continue
connection.execute("""
insert into entry values (?, ?, ?, ?, ?);
""", (eid, entry.date, 'txn', entry.meta["filename"], entry.meta["lineno"]))
connection.execute("""
insert into transactions_detail values (?, ?, ?, ?, ?, ?);
""", (eid, entry.flag, entry.payee, entry.narration,
','.join(entry.tags or ()), ','.join(entry.links or ())))
for posting in entry.postings:
pid = next(postings_count)
units = posting.units
cost = posting.cost
price = posting.price
connection.execute("""
INSERT INTO postings VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
""", (pid, eid,
posting.flag,
posting.account,
units.number,
units.currency,
cost.number if cost else None,
cost.currency if cost else None,
cost.date if cost else None,
cost.label if cost else None,
price.number if price else None,
price.currency if price else None))
beancount.scripts.sql.setup_decimal_support()
Setup sqlite3 to support conversions to/from Decimal numbers.
Source code in beancount/scripts/sql.py
def setup_decimal_support():
"""Setup sqlite3 to support conversions to/from Decimal numbers.
"""
dbapi.register_adapter(Decimal, adapt_decimal)
dbapi.register_converter("decimal", convert_decimal)
beancount.scripts.tutorial
Write output files for the tutorial commands.