beancount.loader
Loader code. This is the main entry point to load up a file.
beancount.loader.LoadError (tuple)
LoadError(source, message, entry)
beancount.loader.LoadError.__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in beancount/loader.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.loader.LoadError.__new__(_cls, source, message, entry)
special
staticmethod
Create new instance of LoadError(source, message, entry)
beancount.loader.LoadError.__replace__(/, self, **kwds)
special
Return a new LoadError object replacing specified fields with new values
Source code in beancount/loader.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.loader.LoadError.__repr__(self)
special
Return a nicely formatted representation string
Source code in beancount/loader.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.loader.aggregate_options_map(options_map, other_options_map)
Aggregate some of the attributes of options map.
Parameters: |
|
---|
Source code in beancount/loader.py
def aggregate_options_map(options_map, other_options_map):
"""Aggregate some of the attributes of options map.
Args:
options_map: The target map in which we want to aggregate attributes.
Note: This value is mutated in-place.
other_options_map: A list of other options maps, some of whose values
we'd like to see aggregated.
"""
options_map = copy.copy(options_map)
currencies = list(options_map["operating_currency"])
for omap in other_options_map:
currencies.extend(omap["operating_currency"])
options_map["dcontext"].update_from(omap["dcontext"])
options_map["operating_currency"] = list(misc_utils.uniquify(currencies))
# Produce a 'pythonpath' value for transformers.
pythonpath = set()
for omap in itertools.chain((options_map,), other_options_map):
if omap.get("insert_pythonpath", False):
pythonpath.add(path.dirname(omap["filename"]))
options_map["pythonpath"] = sorted(pythonpath)
return options_map
beancount.loader.combine_plugins(*plugin_modules)
Combine the plugins from the given plugin modules.
This is used to create plugins of plugins.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def combine_plugins(*plugin_modules):
"""Combine the plugins from the given plugin modules.
This is used to create plugins of plugins.
Args:
*plugins_modules: A sequence of module objects.
Returns:
A list that can be assigned to the new module's __plugins__ attribute.
"""
modules = []
for module in plugin_modules:
modules.extend([getattr(module, name)
for name in module.__plugins__])
return modules
beancount.loader.compute_input_hash(filenames)
Compute a hash of the input data.
Parameters: |
|
---|
Source code in beancount/loader.py
def compute_input_hash(filenames):
"""Compute a hash of the input data.
Args:
filenames: A list of input files. Order is not relevant.
"""
md5 = hashlib.md5()
for filename in sorted(filenames):
md5.update(filename.encode('utf8'))
if not path.exists(filename):
continue
stat = os.stat(filename)
md5.update(struct.pack('dd', stat.st_mtime_ns, stat.st_size))
return md5.hexdigest()
beancount.loader.delete_cache_function(cache_getter, function)
A wrapper that removes the cached filename.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def delete_cache_function(cache_getter, function):
"""A wrapper that removes the cached filename.
Args:
cache_getter: A function of one argument, the top-level filename, which
will return the name of the corresponding cache file.
function: A function object to decorate for caching.
Returns:
A decorated function which will delete the cached filename, if it exists.
"""
@functools.wraps(function)
def wrapped(toplevel_filename, *args, **kw):
# Delete the cache.
cache_filename = cache_getter(toplevel_filename)
if path.exists(cache_filename):
os.remove(cache_filename)
# Invoke the original function.
return function(toplevel_filename, *args, **kw)
return wrapped
beancount.loader.get_cache_filename(pattern, filename)
Compute the cache filename from a given pattern and the top-level filename.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def get_cache_filename(pattern: str, filename: str) -> str:
"""Compute the cache filename from a given pattern and the top-level filename.
Args:
pattern: A cache filename or pattern. If the pattern contains '{filename}' this
will get replaced by the top-level filename. This may be absolute or relative.
filename: The top-level filename.
Returns:
The resolved cache filename.
"""
abs_filename = path.abspath(filename)
if path.isabs(pattern):
abs_pattern = pattern
else:
abs_pattern = path.join(path.dirname(abs_filename), pattern)
return abs_pattern.format(filename=path.basename(filename))
beancount.loader.initialize(use_cache, cache_filename=None)
Initialize the loader.
Source code in beancount/loader.py
def initialize(use_cache: bool, cache_filename: Optional[str] = None):
"""Initialize the loader."""
# Unless an environment variable disables it, use the pickle load cache
# automatically. Note that this works across all Python programs running the
# loader which is why it's located here.
# pylint: disable=invalid-name
global _load_file
# Make a function to compute the cache filename.
cache_pattern = (cache_filename or
os.getenv('BEANCOUNT_LOAD_CACHE_FILENAME') or
PICKLE_CACHE_FILENAME)
cache_getter = functools.partial(get_cache_filename, cache_pattern)
if use_cache:
_load_file = pickle_cache_function(cache_getter, PICKLE_CACHE_THRESHOLD,
_uncached_load_file)
else:
if cache_filename is not None:
logging.warning("Cache disabled; "
"Explicitly overridden cache filename %s will be ignored.",
cache_filename)
_load_file = delete_cache_function(cache_getter,
_uncached_load_file)
beancount.loader.load_doc(expect_errors=False)
A factory of decorators that loads the docstring and calls the function with entries.
This is an incredibly convenient tool to write lots of tests. Write a unittest using the standard TestCase class and put the input entries in the function's docstring.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def load_doc(expect_errors=False):
"""A factory of decorators that loads the docstring and calls the function with entries.
This is an incredibly convenient tool to write lots of tests. Write a
unittest using the standard TestCase class and put the input entries in the
function's docstring.
Args:
expect_errors: A boolean or None, with the following semantics,
True: Expect errors and fail if there are none.
False: Expect no errors and fail if there are some.
None: Do nothing, no check.
Returns:
A wrapped method that accepts a single 'self' argument.
"""
def decorator(fun):
"""A decorator that parses the function's docstring as an argument.
Args:
fun: A callable method, that accepts the three return arguments that
load() returns.
Returns:
A decorated test function.
"""
@functools.wraps(fun)
def wrapper(self):
entries, errors, options_map = load_string(fun.__doc__, dedent=True)
if expect_errors is not None:
if expect_errors is False and errors:
oss = io.StringIO()
printer.print_errors(errors, file=oss)
self.fail("Unexpected errors found:\n{}".format(oss.getvalue()))
elif expect_errors is True and not errors:
self.fail("Expected errors, none found:")
# Note: Even if we expected no errors, we call this function with an
# empty 'errors' list. This is so that the interface does not change
# based on the arguments to the decorator, which would be somewhat
# ugly and which would require explanation.
return fun(self, entries, errors, options_map)
wrapper.__input__ = wrapper.__doc__
wrapper.__doc__ = None
return wrapper
return decorator
beancount.loader.load_encrypted_file(filename, log_timings=None, log_errors=None, extra_validations=None, dedent=False, encoding=None)
Load an encrypted Beancount input file.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def load_encrypted_file(filename, log_timings=None, log_errors=None, extra_validations=None,
dedent=False, encoding=None):
"""Load an encrypted Beancount input file.
Args:
filename: The name of an encrypted file to be parsed.
log_timings: See load_string().
log_errors: See load_string().
extra_validations: See load_string().
dedent: See load_string().
encoding: See load_string().
Returns:
A triple of (entries, errors, option_map) where "entries" is a date-sorted
list of entries from the file, "errors" a list of error objects generated
while parsing and validating the file, and "options_map", a dict of the
options parsed from the file.
"""
contents = encryption.read_encrypted_file(filename)
return load_string(contents,
log_timings=log_timings,
log_errors=log_errors,
extra_validations=extra_validations,
encoding=encoding)
beancount.loader.load_file(filename, log_timings=None, log_errors=None, extra_validations=None, encoding=None)
Open a Beancount input file, parse it, run transformations and validate.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def load_file(filename, log_timings=None, log_errors=None, extra_validations=None,
encoding=None):
"""Open a Beancount input file, parse it, run transformations and validate.
Args:
filename: The name of the file to be parsed.
log_timings: A file object or function to write timings to,
or None, if it should remain quiet. (Note that this is intended to use
the logging methods and does not insert a newline.)
log_errors: A file object or function to write errors to,
or None, if it should remain quiet.
extra_validations: A list of extra validation functions to run after loading
this list of entries.
encoding: A string or None, the encoding to decode the input filename with.
Returns:
A triple of (entries, errors, option_map) where "entries" is a date-sorted
list of entries from the file, "errors" a list of error objects generated
while parsing and validating the file, and "options_map", a dict of the
options parsed from the file.
"""
filename = path.expandvars(path.expanduser(filename))
if not path.isabs(filename):
filename = path.normpath(path.join(os.getcwd(), filename))
if encryption.is_encrypted_file(filename):
# Note: Caching is not supported for encrypted files.
entries, errors, options_map = load_encrypted_file(
filename,
log_timings, log_errors,
extra_validations, False, encoding)
else:
entries, errors, options_map = _load_file(
filename, log_timings,
extra_validations, encoding)
_log_errors(errors, log_errors)
return entries, errors, options_map
beancount.loader.load_string(string, log_timings=None, log_errors=None, extra_validations=None, dedent=False, encoding=None)
Open a Beancount input string, parse it, run transformations and validate.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def load_string(string, log_timings=None, log_errors=None, extra_validations=None,
dedent=False, encoding=None):
"""Open a Beancount input string, parse it, run transformations and validate.
Args:
string: A Beancount input string.
log_timings: A file object or function to write timings to,
or None, if it should remain quiet.
log_errors: A file object or function to write errors to,
or None, if it should remain quiet.
extra_validations: A list of extra validation functions to run after loading
this list of entries.
dedent: A boolean, if set, remove the whitespace in front of the lines.
encoding: A string or None, the encoding to decode the input string with.
Returns:
A triple of (entries, errors, option_map) where "entries" is a date-sorted
list of entries from the string, "errors" a list of error objects
generated while parsing and validating the string, and "options_map", a
dict of the options parsed from the string.
"""
if dedent:
string = textwrap.dedent(string)
entries, errors, options_map = _load([(string, False)], log_timings,
extra_validations, encoding)
_log_errors(errors, log_errors)
return entries, errors, options_map
beancount.loader.needs_refresh(options_map)
Predicate that returns true if at least one of the input files may have changed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def needs_refresh(options_map):
"""Predicate that returns true if at least one of the input files may have changed.
Args:
options_map: An options dict as per the parser.
mtime: A modified time, to check if it covers the include files in the options_map.
Returns:
A boolean, true if the input is obsoleted by changes in the input files.
"""
if options_map is None:
return True
input_hash = compute_input_hash(options_map['include'])
return 'input_hash' not in options_map or input_hash != options_map['input_hash']
beancount.loader.pickle_cache_function(cache_getter, time_threshold, function)
Decorate a loader function to make it loads its result from a pickle cache.
This considers the first argument as a top-level filename and assumes the function to be cached returns an (entries, errors, options_map) triple. We use the 'include' option value in order to check whether any of the included files has changed. It's essentially a special case for an on-disk memoizer. If any of the included files are more recent than the cache, the function is recomputed and the cache refreshed.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def pickle_cache_function(cache_getter, time_threshold, function):
"""Decorate a loader function to make it loads its result from a pickle cache.
This considers the first argument as a top-level filename and assumes the
function to be cached returns an (entries, errors, options_map) triple. We
use the 'include' option value in order to check whether any of the included
files has changed. It's essentially a special case for an on-disk memoizer.
If any of the included files are more recent than the cache, the function is
recomputed and the cache refreshed.
Args:
cache_getter: A function of one argument, the top-level filename, which
will return the name of the corresponding cache file.
time_threshold: A float, the number of seconds below which we don't bother
caching.
function: A function object to decorate for caching.
Returns:
A decorated function which will pull its result from a cache file if
it is available.
"""
@functools.wraps(function)
def wrapped(toplevel_filename, *args, **kw):
cache_filename = cache_getter(toplevel_filename)
# Read the cache if it exists in order to get the list of files whose
# timestamps to check.
exists = path.exists(cache_filename)
if exists:
with open(cache_filename, 'rb') as file:
try:
result = pickle.load(file)
except Exception as exc:
# Note: Not a big fan of doing this, but here we handle all
# possible exceptions because unpickling of an old or
# corrupted pickle file manifests as a variety of different
# exception types.
# The cache file is corrupted; ignore it and recompute.
logging.error("Cache file is corrupted: %s; recomputing.", exc)
result = None
else:
# Check that the latest timestamp has not been written after the
# cache file.
entries, errors, options_map = result
if not needs_refresh(options_map):
# All timestamps are legit; cache hit.
return result
# We failed; recompute the value.
if exists:
try:
os.remove(cache_filename)
except OSError as exc:
# Warn for errors on read-only filesystems.
logging.warning("Could not remove picklecache file %s: %s",
cache_filename, exc)
time_before = time.time()
result = function(toplevel_filename, *args, **kw)
time_after = time.time()
# Overwrite the cache file if the time it takes to compute it
# justifies it.
if time_after - time_before > time_threshold:
try:
with open(cache_filename, 'wb') as file:
pickle.dump(result, file)
except Exception as exc:
logging.warning("Could not write to picklecache file %s: %s",
cache_filename, exc)
return result
return wrapped
beancount.loader.run_transformations(entries, parse_errors, options_map, log_timings)
Run the various transformations on the entries.
This is where entries are being synthesized, checked, plugins are run, etc.
Parameters: |
|
---|
Returns: |
|
---|
Source code in beancount/loader.py
def run_transformations(entries, parse_errors, options_map, log_timings):
"""Run the various transformations on the entries.
This is where entries are being synthesized, checked, plugins are run, etc.
Args:
entries: A list of directives as read from the parser.
parse_errors: A list of errors so far.
options_map: An options dict as read from the parser.
log_timings: A function to write timing log entries to, or None, if it
should be quiet.
Returns:
A list of modified entries, and a list of errors, also possibly modified.
"""
# A list of errors to extend (make a copy to avoid modifying the input).
errors = list(parse_errors)
# Process the plugins.
if options_map['plugin_processing_mode'] == 'raw':
plugins_iter = options_map["plugin"]
elif options_map['plugin_processing_mode'] == 'default':
plugins_iter = itertools.chain(PLUGINS_PRE,
options_map["plugin"],
PLUGINS_AUTO,
PLUGINS_POST)
else:
assert "Invalid value for plugin_processing_mode: {}".format(
options_map['plugin_processing_mode'])
for plugin_name, plugin_config in plugins_iter:
# Issue a warning on a renamed module.
renamed_name = RENAMED_MODULES.get(plugin_name, None)
if renamed_name:
warnings.warn("Deprecation notice: Module '{}' has been renamed to '{}'; "
"please adjust your plugin directive.".format(
plugin_name, renamed_name))
plugin_name = renamed_name
# Try to import the module.
#
# Note: We intercept import errors and continue but let other plugin
# import time exceptions fail a run, by choice.
try:
module = importlib.import_module(plugin_name)
if not hasattr(module, '__plugins__'):
continue
except ImportError:
# Upon failure, just issue an error.
formatted_traceback = traceback.format_exc().replace("\n", "\n ")
errors.append(LoadError(data.new_metadata("<load>", 0),
'Error importing "{}": {}'.format(
plugin_name, formatted_traceback), None))
continue
# Apply it.
with misc_utils.log_time(plugin_name, log_timings, indent=2):
# Run each transformer function in the plugin.
for function_name in module.__plugins__:
if isinstance(function_name, str):
# Support plugin functions provided by name.
callback = getattr(module, function_name)
else:
# Support function types directly, not just names.
callback = function_name
# Provide arguments if config is provided.
# TODO(blais): Make this consistent in v3, not conditional.
args = () if plugin_config is None else (plugin_config,)
# Catch all exceptions raised in running the plugin, except exits.
try:
entries, plugin_errors = callback(entries, options_map, *args)
errors.extend(plugin_errors)
except Exception as exc:
# Allow the user to exit in a plugin.
if isinstance(exc, SystemExit):
raise
# Upon failure, just issue an error.
formatted_traceback = traceback.format_exc().replace("\n", "\n ")
errors.append(LoadError(data.new_metadata("<load>", 0),
'Error applying plugin "{}": {}'.format(
plugin_name, formatted_traceback), None))
continue
# Ensure that the entries are sorted. Don't trust the plugins
# themselves.
entries.sort(key=data.entry_sortkey)
return entries, errors