beancount.utils

Generic utility packages and functions.

beancount.utils.bisect_key

A version of bisect that accepts a custom key function, like the sorting ones do.

beancount.utils.bisect_key.bisect_left_with_key (sequence, value, key=None)

Find the last element before the given value in a sorted list.

Parameters:

Name Type Description Default
sequence

A sorted sequence of elements.

required
value

The value to search for.

required
key

An optional function used to extract the value from the elements of sequence.

None

Returns:

Type Description

Return the index. May return None.

Source code in beancount/utils/bisect_key.py
def bisect_left_with_key(sequence, value, key=None):
    """Find the last element before the given value in a sorted list.

    Args:
      sequence: A sorted sequence of elements.
      value: The value to search for.
      key: An optional function used to extract the value from the elements of
        sequence.
    Returns:
      Return the index. May return None.
    """
    # pylint: disable=invalid-name
    if key is None:
        key = lambda x: x  # Identity.

    lo = 0
    hi = len(sequence)

    while lo < hi:
        mid = (lo + hi) // 2
        if key(sequence[mid]) < value:
            lo = mid + 1
        else:
            hi = mid
    return lo

beancount.utils.bisect_key.bisect_right_with_key (a, x, key, lo=0, hi=None)

Like bisect.bisect_right, but with a key lookup parameter.

Parameters:

Name Type Description Default
a

The list to search in.

required
x

The element to search for.

required
key

A function, to extract the value from the list.

required
lo

The smallest index to search.

0
hi

The largest index to search.

None

Returns:

Type Description

As in bisect.bisect_right, an element from list 'a'.

Source code in beancount/utils/bisect_key.py
def bisect_right_with_key(a, x, key, lo=0, hi=None):
    """Like bisect.bisect_right, but with a key lookup parameter.

    Args:
      a: The list to search in.
      x: The element to search for.
      key: A function, to extract the value from the list.
      lo: The smallest index to search.
      hi: The largest index to search.
    Returns:
      As in bisect.bisect_right, an element from list 'a'.
    """
    # pylint: disable=invalid-name
    if lo < 0:
        raise ValueError('lo must be non-negative')
    if hi is None:
        hi = len(a)
    while lo < hi:
        mid = (lo+hi)//2
        if x < key(a[mid]):
            hi = mid
        else:
            lo = mid+1
    return lo

beancount.utils.csv_utils

Utilities for reading and writing CSV files.

beancount.utils.csv_utils.as_rows (string)

Split a string as rows of a CSV file.

Parameters:

Name Type Description Default
string

A string to be split, the contents of a CSV file.

required

Returns:

Type Description

Lists of lists of strings.

Source code in beancount/utils/csv_utils.py
def as_rows(string):
    """Split a string as rows of a CSV file.

    Args:
      string: A string to be split, the contents of a CSV file.
    Returns:
      Lists of lists of strings.
    """
    return list(csv.reader(io.StringIO(textwrap.dedent(string))))

beancount.utils.csv_utils.csv_clean_header (header_row)

Create a new class for the following rows from the header line. This both normalizes the header line and assign

Parameters:

Name Type Description Default
header_row

A list of strings, the row with header titles.

required

Returns:

Type Description

A list of strings, with possibly modified (cleaned) row titles, of the same lengths.

Source code in beancount/utils/csv_utils.py
def csv_clean_header(header_row):
    """Create a new class for the following rows from the header line.
    This both normalizes the header line and assign

    Args:
      header_row: A list of strings, the row with header titles.
    Returns:
      A list of strings, with possibly modified (cleaned) row titles, of the
      same lengths.
    """
    fieldnames = []
    for index, column in enumerate(header_row):
        field = column.lower()
        field = re.sub(r'\bp/l\b', 'pnl', field)
        field = re.sub('[^a-z0-9]', '_', field)
        field = field.strip(' _')
        field = re.sub('__+', '_', field)
        if not field:
            field = 'col{}'.format(index)
        assert field not in fieldnames, field
        fieldnames.append(field)
    return fieldnames

beancount.utils.csv_utils.csv_dict_reader (fileobj, ** kw)

Read a CSV file yielding normalized dictionary fields.

This is basically an alternative constructor for csv.DictReader that normalized the field names.

Parameters:

Name Type Description Default
fileobj

A file object to be read.

required
**kw

Optional arguments forwarded to csv.DictReader.

{}

Returns:

Type Description

A csv.DictReader object.

Source code in beancount/utils/csv_utils.py
def csv_dict_reader(fileobj, **kw):
    """Read a CSV file yielding normalized dictionary fields.

    This is basically an alternative constructor for csv.DictReader that
    normalized the field names.

    Args:
      fileobj: A file object to be read.
      **kw: Optional arguments forwarded to csv.DictReader.
    Returns:
      A csv.DictReader object.
    """
    reader = csv.DictReader(fileobj, **kw)
    reader.fieldnames = csv_clean_header(reader.fieldnames)
    return reader

beancount.utils.csv_utils.csv_split_sections (rows)

Given rows, split them in at empty lines. This is useful for structured CSV files with multiple sections.

Parameters:

Name Type Description Default
rows

A list of rows, which are themselves lists of strings.

required

Returns:

Type Description

A list of sections, which are lists of rows, which are lists of strings.

Source code in beancount/utils/csv_utils.py
def csv_split_sections(rows):
    """Given rows, split them in at empty lines.
    This is useful for structured CSV files with multiple sections.

    Args:
      rows: A list of rows, which are themselves lists of strings.
    Returns:
      A list of sections, which are lists of rows, which are lists of strings.
    """
    sections = []
    current_section = []
    for row in rows:
        if row:
            current_section.append(row)
        else:
            sections.append(current_section)
            current_section = []
    if current_section:
        sections.append(current_section)
    return sections

beancount.utils.csv_utils.csv_split_sections_with_titles (rows)

Given a list of rows, split their sections. If the sections have single column titles, consume those lines as their names and return a mapping of section names.

This is useful for CSV files with multiple sections, where the separator is a title. We use this to separate the multiple tables within the CSV files.

Parameters:

Name Type Description Default
rows

A list of rows (list-of-strings).

required

Returns:

Type Description

A list of lists of rows (list-of-strings).

Source code in beancount/utils/csv_utils.py
def csv_split_sections_with_titles(rows):
    """Given a list of rows, split their sections. If the sections have single
    column titles, consume those lines as their names and return a mapping of
    section names.

    This is useful for CSV files with multiple sections, where the separator is
    a title. We use this to separate the multiple tables within the CSV files.

    Args:
      rows: A list of rows (list-of-strings).
    Returns:
     A list of lists of rows (list-of-strings).

    """
    sections_map = {}
    for index, section in enumerate(csv_split_sections(rows)):
        # Skip too short sections, cannot possibly be a title.
        if len(section) < 2:
            continue
        if len(section[0]) == 1 and len(section[1]) != 1:
            name = section[0][0]
            section = section[1:]
        else:
            name = 'Section {}'.format(index)
        sections_map[name] = section
    return sections_map

beancount.utils.csv_utils.csv_tuple_reader (fileobj, ** kw)

Read a CSV file yielding namedtuple instances. The CSV file must have a header line.

Parameters:

Name Type Description Default
fileobj

A file object to be read.

required
**kw

Optional arguments forwarded to csv.DictReader.

{}

Yields: Nametuple instances, one for each row.

Source code in beancount/utils/csv_utils.py
def csv_tuple_reader(fileobj, **kw):
    """Read a CSV file yielding namedtuple instances. The CSV file must have a
    header line.

    Args:
      fileobj: A file object to be read.
      **kw: Optional arguments forwarded to csv.DictReader.
    Yields:
      Nametuple instances, one for each row.
    """
    reader = csv.reader(fileobj, **kw)
    ireader = iter(reader)
    fieldnames = csv_clean_header(next(ireader))
    Tuple = collections.namedtuple('Row', fieldnames)
    for row in ireader:
        try:
            yield Tuple(*row)
        except TypeError:
            # If there's an error, it's usually from a line that has a 'END OF
            # LINE' marker at the end, or some comment line.
            assert len(row) in (0, 1)

beancount.utils.csv_utils.iter_sections (fileobj, separating_predicate=None)

For a given file object, yield file-like objects for each of the sections contained therein. A section is defined as a list of lines that don't match the predicate. For example, if you want to split by empty lines, provide a predicate that will be true given an empty line, which will cause a new section to be begun.

Parameters:

Name Type Description Default
fileobj

A file object to read from.

required
separating_predicate

A boolean predicate that is true on separating lines.

None

Yields: A list of lines that you can use to iterate.

Source code in beancount/utils/csv_utils.py
def iter_sections(fileobj, separating_predicate=None):
    """For a given file object, yield file-like objects for each of the sections
    contained therein. A section is defined as a list of lines that don't match
    the predicate. For example, if you want to split by empty lines, provide a
    predicate that will be true given an empty line, which will cause a new
    section to be begun.

    Args:
      fileobj: A file object to read from.
      separating_predicate: A boolean predicate that is true on separating lines.
    Yields:
      A list of lines that you can use to iterate.
    """
    if separating_predicate is None:
        separating_predicate = lambda line: bool(line.strip())

    lineiter = iter(fileobj)
    for line in lineiter:
        if separating_predicate(line):
            iterator = itertools.chain((line,),
                                       iter_until_empty(lineiter,
                                                        separating_predicate))
            yield iterator
            for _ in iterator:
                pass

beancount.utils.csv_utils.iter_until_empty (iterator, separating_predicate=None)

An iterator of lines that will stop at the first empty line.

Parameters:

Name Type Description Default
iterator

An iterator of lines.

required
separating_predicate

A boolean predicate that is true on separating lines.

None

Yields: Non-empty lines. EOF when we hit an empty line.

Source code in beancount/utils/csv_utils.py
def iter_until_empty(iterator, separating_predicate=None):
    """An iterator of lines that will stop at the first empty line.

    Args:
      iterator: An iterator of lines.
      separating_predicate: A boolean predicate that is true on separating lines.
    Yields:
      Non-empty lines. EOF when we hit an empty line.
    """
    if separating_predicate is None:
        separating_predicate = lambda line: bool(line.strip())

    for line in iterator:
        if not separating_predicate(line):
            break
        yield line

beancount.utils.date_utils

Parse the date from various formats.

beancount.utils.date_utils.intimezone (tz_value)

Temporarily reset the value of TZ.

This is used for testing.

Parameters:

Name Type Description Default
tz_value str

The value of TZ to set for the duration of this context.

required

Returns:

Type Description

A contextmanager in the given timezone locale.

Source code in beancount/utils/date_utils.py
@contextlib.contextmanager
def intimezone(tz_value: str):
    """Temporarily reset the value of TZ.

    This is used for testing.

    Args:
      tz_value: The value of TZ to set for the duration of this context.
    Returns:
      A contextmanager in the given timezone locale.
    """
    tz_old = os.environ.get('TZ', None)
    os.environ['TZ'] = tz_value
    time.tzset()
    try:
        yield
    finally:
        if tz_old is None:
            del os.environ['TZ']
        else:
            os.environ['TZ'] = tz_old
        time.tzset()

beancount.utils.date_utils.iter_dates (start_date, end_date)

Yield all the dates between 'start_date' and 'end_date'.

Parameters:

Name Type Description Default
start_date

An instance of datetime.date.

required
end_date

An instance of datetime.date.

required

Yields: Instances of datetime.date.

Source code in beancount/utils/date_utils.py
def iter_dates(start_date, end_date):
    """Yield all the dates between 'start_date' and 'end_date'.

    Args:
      start_date: An instance of datetime.date.
      end_date: An instance of datetime.date.
    Yields:
      Instances of datetime.date.
    """
    oneday = datetime.timedelta(days=1)
    date = start_date
    while date < end_date:
        yield date
        date += oneday

beancount.utils.date_utils.next_month (date)

Compute the date at the beginning of the following month from the given date.

Parameters:

Name Type Description Default
date

A datetime.date instance.

required

Returns:

Type Description

A datetime.date instance, the first day of the month following 'date'.

Source code in beancount/utils/date_utils.py
def next_month(date):
    """Compute the date at the beginning of the following month from the given date.

    Args:
      date: A datetime.date instance.
    Returns:
      A datetime.date instance, the first day of the month following 'date'.
    """
    # Compute the date at the beginning of the following month.
    year = date.year
    month = date.month + 1
    if date.month == 12:
        year += 1
        month = 1
    return datetime.date(year, month, 1)

beancount.utils.date_utils.parse_date_liberally (string, parse_kwargs_dict=None)

Parse arbitrary strings to dates.

This function is intended to support liberal inputs, so that we can use it in accepting user-specified dates on command-line scripts.

Parameters:

Name Type Description Default
string

A string to parse.

required
parse_kwargs_dict

Dict of kwargs to pass to dateutil parser.

None

Returns:

Type Description

A datetime.date object.

Source code in beancount/utils/date_utils.py
def parse_date_liberally(string, parse_kwargs_dict=None):
    """Parse arbitrary strings to dates.

    This function is intended to support liberal inputs, so that we can use it
    in accepting user-specified dates on command-line scripts.

    Args:
      string: A string to parse.
      parse_kwargs_dict: Dict of kwargs to pass to dateutil parser.
    Returns:
      A datetime.date object.
    """
    # At the moment, rely on the most excellent dateutil.
    if parse_kwargs_dict is None:
        parse_kwargs_dict = {}
    return dateutil.parser.parse(string, **parse_kwargs_dict).date()

beancount.utils.date_utils.render_ofx_date (dtime)

Render a datetime to the OFX format.

Parameters:

Name Type Description Default
dtime

A datetime.datetime instance.

required

Returns:

Type Description

A string, rendered to milliseconds.

Source code in beancount/utils/date_utils.py
def render_ofx_date(dtime):
    """Render a datetime to the OFX format.

    Args:
      dtime: A datetime.datetime instance.
    Returns:
      A string, rendered to milliseconds.
    """
    return '{}.{:03d}'.format(dtime.strftime('%Y%m%d%H%M%S'),
                              int(dtime.microsecond / 1000))

beancount.utils.defdict

An instance of collections.defaultdict whose factory accepts a key.

Note: This really ought to be an enhancement to Python itself. I should bother adding this in eventually.

beancount.utils.defdict.DefaultDictWithKey

A version of defaultdict whose factory accepts the key as an argument. Note: collections.defaultdict would be improved by supporting this directly, this is a common occurrence.

beancount.utils.defdict.ImmutableDictWithDefault

An immutable dict which returns a default value for missing keys.

This differs from a defaultdict in that it does not insert a missing default
value when one is materialized (from a missing fetch), and furthermore, the
set method is make unavailable to prevent mutation beyond construction.

beancount.utils.defdict.ImmutableDictWithDefault.__setitem__ (self, key, value) special

Disallow mutating the dict in the usual way.

Source code in beancount/utils/defdict.py
def __setitem__(self, key, value):
    """Disallow mutating the dict in the usual way."""
    raise NotImplementedError

beancount.utils.defdict.ImmutableDictWithDefault.get (self, key, _=None)

Return the value for key if key is in the dictionary, else default.

Source code in beancount/utils/defdict.py
def get(self, key, _=None):
    return self.__getitem__(key)

beancount.utils.encryption

Support for encrypted tests.

beancount.utils.encryption.is_encrypted_file (filename)

Return true if the given filename contains an encrypted file.

Parameters:

Name Type Description Default
filename

A path string.

required

Returns:

Type Description

A boolean, true if the file contains an encrypted file.

Source code in beancount/utils/encryption.py
def is_encrypted_file(filename):
    """Return true if the given filename contains an encrypted file.

    Args:
      filename: A path string.
    Returns:
      A boolean, true if the file contains an encrypted file.
    """
    _, ext = path.splitext(filename)
    if ext == '.gpg':
        return True
    if ext == '.asc':
        with open(filename) as encfile:
            head = encfile.read(1024)
            if re.search('--BEGIN PGP MESSAGE--', head):
                return True
    return False

beancount.utils.encryption.is_gpg_installed ()

Return true if GPG 1.4.x or 2.x are installed, which is what we use and support.

Source code in beancount/utils/encryption.py
def is_gpg_installed():
    """Return true if GPG 1.4.x or 2.x are installed, which is what we use and support."""
    try:
        pipe = subprocess.Popen(['gpg', '--version'], shell=0,
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = pipe.communicate()
        version_text = out.decode('utf8')
        return pipe.returncode == 0 and re.match(r'gpg \(GnuPG\) (1\.4|2)\.', version_text)
    except OSError:
        return False

beancount.utils.encryption.read_encrypted_file (filename)

Decrypt and read an encrypted file without temporary storage.

Parameters:

Name Type Description Default
filename

A string, the path to the encrypted file.

required

Returns:

Type Description

A string, the contents of the file.

Exceptions:

Type Description
OSError

If we could not properly decrypt the file.

Source code in beancount/utils/encryption.py
def read_encrypted_file(filename):
    """Decrypt and read an encrypted file without temporary storage.

    Args:
      filename: A string, the path to the encrypted file.
    Returns:
      A string, the contents of the file.
    Raises:
      OSError: If we could not properly decrypt the file.
    """
    command = ['gpg', '--batch', '--decrypt', path.realpath(filename)]
    pipe = subprocess.Popen(command,
                            shell=False,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    contents, errors = pipe.communicate()
    if pipe.returncode != 0:
        raise OSError("Could not decrypt file ({}): {}".format(pipe.returncode,
                                                               errors.decode('utf8')))
    return contents.decode('utf-8')

beancount.utils.file_type

Code that can guess a MIME type for a filename.

This attempts to identify the mime-type of a file suing

  1. The built-in mimetypes library, then
  2. python-magic (if available), and finally
  3. some custom mappers for datatypes used in financial downloads, such as Quicken files.

beancount.utils.file_type.guess_file_type (filename)

Attempt to guess the type of the input file.

Parameters:

Name Type Description Default
filename

A string, the name of the file to guess the type for.

required

Returns:

Type Description

A suitable mimetype string, or None if we could not guess.

Source code in beancount/utils/file_type.py
def guess_file_type(filename):
    """Attempt to guess the type of the input file.

    Args:
      filename: A string, the name of the file to guess the type for.
    Returns:
      A suitable mimetype string, or None if we could not guess.
    """

    # Try the standard mimetypes association.
    filetype, _ = mimetypes.guess_type(filename, False)
    if filetype:
        return filetype

    # Try out some extra types that only we know about.
    for regexp, mimetype in EXTRA_FILE_TYPES:
        if regexp.match(filename):
            return mimetype

    # Try out libmagic, if it is installed.
    if magic:
        filetype = magic.from_file(filename, mime=True)
        if isinstance(filetype, bytes):
            filetype = filetype.decode('utf8')
        return filetype
    else:
        raise ValueError(("Could not identify the type of file '{}'; "
                          "try installing python-magic").format(filename))

beancount.utils.file_utils

File utilities.

beancount.utils.file_utils.chdir (directory)

Temporarily chdir to the given directory.

Parameters:

Name Type Description Default
directory

The directory to switch do.

required

Returns:

Type Description

A context manager which restores the cwd after running.

Source code in beancount/utils/file_utils.py
@contextlib.contextmanager
def chdir(directory):
    """Temporarily chdir to the given directory.

    Args:
      directory: The directory to switch do.
    Returns:
      A context manager which restores the cwd after running.
    """
    cwd = os.getcwd()
    os.chdir(directory)
    try:
        yield cwd
    finally:
        os.chdir(cwd)

beancount.utils.file_utils.find_files (fords, ignore_dirs=('.hg', '.svn', '.git'), ignore_files=('.DS_Store',))

Enumerate the files under the given directories, stably.

Invalid file or directory names will be logged to the error log.

Parameters:

Name Type Description Default
fords

A list of strings, file or directory names.

required
ignore_dirs

A list of strings, filenames or directories to be ignored.

('.hg', '.svn', '.git')

Yields: Strings, full filenames from the given roots.

Source code in beancount/utils/file_utils.py
def find_files(fords,
               ignore_dirs=('.hg', '.svn', '.git'),
               ignore_files=('.DS_Store',)):
    """Enumerate the files under the given directories, stably.

    Invalid file or directory names will be logged to the error log.

    Args:
      fords: A list of strings, file or directory names.
      ignore_dirs: A list of strings, filenames or directories to be ignored.
    Yields:
      Strings, full filenames from the given roots.
    """
    if isinstance(fords, str):
        fords = [fords]
    assert isinstance(fords, (list, tuple))
    for ford in fords:
        if path.isdir(ford):
            for root, dirs, filenames in os.walk(ford):
                dirs[:] = sorted(dirname for dirname in dirs if dirname not in ignore_dirs)
                for filename in sorted(filenames):
                    if filename in ignore_files:
                        continue
                    yield path.join(root, filename)
        elif path.isfile(ford) or path.islink(ford):
            yield ford
        elif not path.exists(ford):
            logging.error("File or directory '{}' does not exist.".format(ford))

beancount.utils.file_utils.guess_file_format (filename, default=None)

Guess the file format from the filename.

Parameters:

Name Type Description Default
filename

A string, the name of the file. This can be None.

required

Returns:

Type Description

A string, the extension of the format, without a leading period.

Source code in beancount/utils/file_utils.py
def guess_file_format(filename, default=None):
    """Guess the file format from the filename.

    Args:
      filename: A string, the name of the file. This can be None.
    Returns:
      A string, the extension of the format, without a leading period.
    """
    if filename:
        if filename.endswith('.txt') or filename.endswith('.text'):
            format = 'text'
        elif filename.endswith('.csv'):
            format = 'csv'
        elif filename.endswith('.html') or filename.endswith('.xhtml'):
            format = 'html'
        else:
            format = default
    else:
        format = default
    return format

beancount.utils.file_utils.path_greedy_split (filename)

Split a path, returning the longest possible extension.

Parameters:

Name Type Description Default
filename

A string, the filename to split.

required

Returns:

Type Description

A pair of basename, extension (which includes the leading period).

Source code in beancount/utils/file_utils.py
def path_greedy_split(filename):
    """Split a path, returning the longest possible extension.

    Args:
      filename: A string, the filename to split.
    Returns:
      A pair of basename, extension (which includes the leading period).
    """
    basename = path.basename(filename)
    index = basename.find('.')
    if index == -1:
        extension = None
    else:
        extension = basename[index:]
        basename = basename[:index]
    return (path.join(path.dirname(filename), basename), extension)

beancount.utils.file_utils.touch_file (filename, * otherfiles)

Touch a file and wait until its timestamp has been changed.

Parameters:

Name Type Description Default
filename

A string path, the name of the file to touch.

required
otherfiles

A list of other files to ensure the timestamp is beyond of.

()
Source code in beancount/utils/file_utils.py
def touch_file(filename, *otherfiles):
    """Touch a file and wait until its timestamp has been changed.

    Args:
      filename: A string path, the name of the file to touch.
      otherfiles: A list of other files to ensure the timestamp is beyond of.
    """
    # Note: You could set os.stat_float_times() but then the main function would
    # have to set that up as well. It doesn't help so much, however, since
    # filesystems tend to have low resolutions, e.g. one second.
    orig_mtime_ns = max(os.stat(minfile).st_mtime_ns
                        for minfile in (filename,) + otherfiles)
    delay_secs = 0.05
    while True:
        with open(filename, 'a'):
            os.utime(filename)
        time.sleep(delay_secs)
        new_stat = os.stat(filename)
        if new_stat.st_mtime_ns > orig_mtime_ns:
            break

beancount.utils.import_utils

Utilities for importing symbols programmatically.

beancount.utils.import_utils.import_symbol (dotted_name)

Import a symbol in an arbitrary module.

Parameters:

Name Type Description Default
dotted_name

A dotted path to a symbol.

required

Returns:

Type Description

The object referenced by the given name.

Exceptions:

Type Description
ImportError

If the module not not be imported.

AttributeError

If the symbol could not be found in the module.

Source code in beancount/utils/import_utils.py
def import_symbol(dotted_name):
    """Import a symbol in an arbitrary module.

    Args:
      dotted_name: A dotted path to a symbol.
    Returns:
      The object referenced by the given name.
    Raises:
      ImportError: If the module not not be imported.
      AttributeError: If the symbol could not be found in the module.
    """
    comps = dotted_name.split('.')
    module_name = '.'.join(comps[:-1])
    symbol_name = comps[-1]
    module = importlib.import_module(module_name)
    return getattr(module, symbol_name)

beancount.utils.invariants

Functions to register auxiliary functions on a class' methods to check for invariants.

This is intended to be used in a test, whereby your test will setup a class to automatically run invariant verification functions before and after each function call, to ensure some extra sanity checks that wouldn't be used in non-tests.

Example: Instrument the Inventory class with the check_inventory_invariants() function.

def setUp(module): instrument_invariants(Inventory, check_inventory_invariants, check_inventory_invariants)

def tearDown(module): uninstrument_invariants(Inventory)

beancount.utils.invariants.instrument_invariants (klass, prefun, postfun)

Instrument the class 'klass' with pre/post invariant checker functions.

Parameters:

Name Type Description Default
klass

A class object, whose methods to be instrumented.

required
prefun

A function that checks invariants pre-call.

required
postfun

A function that checks invariants pre-call.

required
Source code in beancount/utils/invariants.py
def instrument_invariants(klass, prefun, postfun):
    """Instrument the class 'klass' with pre/post invariant
    checker functions.

    Args:
      klass: A class object, whose methods to be instrumented.
      prefun: A function that checks invariants pre-call.
      postfun: A function that checks invariants pre-call.
    """
    instrumented = {}
    for attrname, object_ in klass.__dict__.items():
        if attrname.startswith('_'):
            continue
        if not isinstance(object_, types.FunctionType):
            continue
        instrumented[attrname] = object_
        setattr(klass, attrname,
                invariant_check(object_, prefun, postfun))
    klass.__instrumented = instrumented

beancount.utils.invariants.invariant_check (method, prefun, postfun)

Decorate a method with the pre/post invariant checkers.

Parameters:

Name Type Description Default
method

An unbound method to instrument.

required
prefun

A function that checks invariants pre-call.

required
postfun

A function that checks invariants post-call.

required

Returns:

Type Description

An unbound method, decorated.

Source code in beancount/utils/invariants.py
def invariant_check(method, prefun, postfun):
    """Decorate a method with the pre/post invariant checkers.

    Args:
      method: An unbound method to instrument.
      prefun: A function that checks invariants pre-call.
      postfun: A function that checks invariants post-call.
    Returns:
      An unbound method, decorated.
    """
    reentrant = []
    def new_method(self, *args, **kw):
        reentrant.append(None)
        if len(reentrant) == 1:
            prefun(self)
        result = method(self, *args, **kw)
        if len(reentrant) == 1:
            postfun(self)
        reentrant.pop()
        return result
    return new_method

beancount.utils.invariants.uninstrument_invariants (klass)

Undo the instrumentation for invariants.

Parameters:

Name Type Description Default
klass

A class object, whose methods to be uninstrumented.

required
Source code in beancount/utils/invariants.py
def uninstrument_invariants(klass):
    """Undo the instrumentation for invariants.

    Args:
      klass: A class object, whose methods to be uninstrumented.
    """
    instrumented = getattr(klass, '__instrumented', None)
    if instrumented:
        for attrname, object_ in instrumented.items():
            setattr(klass, attrname, object_)
    del klass.__instrumented

beancount.utils.memo

Memoization utilities.

beancount.utils.memo.memoize_recent_fileobj (function, cache_filename, expiration=None)

Memoize recent calls to the given function which returns a file object.

The results of the cache expire after some time.

Parameters:

Name Type Description Default
function

A callable object.

required
cache_filename

A string, the path to the database file to cache to.

required
expiration

The time during which the results will be kept valid. Use 'None' to never expire the cache (this is the default).

None

Returns:

Type Description

A memoized version of the function.

Source code in beancount/utils/memo.py
def memoize_recent_fileobj(function, cache_filename, expiration=None):
    """Memoize recent calls to the given function which returns a file object.

    The results of the cache expire after some time.

    Args:
      function: A callable object.
      cache_filename: A string, the path to the database file to cache to.
      expiration: The time during which the results will be kept valid. Use
        'None' to never expire the cache (this is the default).
    Returns:
      A memoized version of the function.
    """
    urlcache = shelve.open(cache_filename, 'c')
    urlcache.lock = threading.Lock()  # Note: 'shelve' is not thread-safe.
    @functools.wraps(function)
    def memoized(*args, **kw):
        # Encode the arguments, including a date string in order to invalidate
        # results over some time.
        md5 = hashlib.md5()
        md5.update(str(args).encode('utf-8'))
        md5.update(str(sorted(kw.items())).encode('utf-8'))

        hash_ = md5.hexdigest()
        time_now = now()
        try:
            with urlcache.lock:
                time_orig, contents = urlcache[hash_]
            if expiration is not None and (time_now - time_orig) > expiration:
                raise KeyError
        except KeyError:
            fileobj = function(*args, **kw)
            if fileobj:
                contents = fileobj.read()
                with urlcache.lock:
                    urlcache[hash_] = (time_now, contents)
            else:
                contents = None

        return io.BytesIO(contents) if contents else None
    return memoized

beancount.utils.memo.now ()

Indirection on datetime.datetime.now() for testing.

Source code in beancount/utils/memo.py
def now():
    "Indirection on datetime.datetime.now() for testing."
    return datetime.datetime.now()

beancount.utils.misc_utils

Generic utility packages and functions.

beancount.utils.misc_utils.LineFileProxy

A file object that will delegate writing full lines to another logging function. This may be used for writing data to a logging level without having to worry about lines.

beancount.utils.misc_utils.LineFileProxy.__init__ (self, line_writer, prefix=None, write_newlines=False) special

Construct a new line delegator file object proxy.

Parameters:

Name Type Description Default
line_writer

A callable function, used to write to the delegated output.

required
prefix

An optional string, the prefix to insert before every line.

None
write_newlines

A boolean, true if we should output the newline characters.

False
Source code in beancount/utils/misc_utils.py
def __init__(self, line_writer, prefix=None, write_newlines=False):
    """Construct a new line delegator file object proxy.

    Args:
      line_writer: A callable function, used to write to the delegated output.
      prefix: An optional string, the prefix to insert before every line.
      write_newlines: A boolean, true if we should output the newline characters.
    """
    self.line_writer = line_writer
    self.prefix = prefix
    self.write_newlines = write_newlines
    self.data = []

beancount.utils.misc_utils.LineFileProxy.close (self)

Close the line delegator.

Source code in beancount/utils/misc_utils.py
def close(self):
    """Close the line delegator."""
    self.flush()

beancount.utils.misc_utils.LineFileProxy.flush (self)

Flush the data to the line writer.

Source code in beancount/utils/misc_utils.py
def flush(self):
    """Flush the data to the line writer."""
    data = ''.join(self.data)
    if data:
        lines = data.splitlines()
        self.data = [lines.pop(-1)] if data[-1] != '\n' else []
        for line in lines:
            if self.prefix:
                line = self.prefix + line
            if self.write_newlines:
                line += '\n'
            self.line_writer(line)

beancount.utils.misc_utils.LineFileProxy.write (self, data)

Write some string data to the output.

Parameters:

Name Type Description Default
data

A string, with or without newlines.

required
Source code in beancount/utils/misc_utils.py
def write(self, data):
    """Write some string data to the output.

    Args:
      data: A string, with or without newlines.
    """
    if '\n' in data:
        self.data.append(data)
        self.flush()
    else:
        self.data.append(data)

beancount.utils.misc_utils.TypeComparable

A base class whose equality comparison includes comparing the type of the instance itself.

beancount.utils.misc_utils.box (name=None, file=None)

A context manager that prints out a box around a block. This is useful for printing out stuff from tests in a way that is readable.

Parameters:

Name Type Description Default
name

A string, the name of the box to use.

None
file

The file object to print to.

None

Yields: None.

Source code in beancount/utils/misc_utils.py
@contextlib.contextmanager
def box(name=None, file=None):
    """A context manager that prints out a box around a block.
    This is useful for printing out stuff from tests in a way that is readable.

    Args:
      name: A string, the name of the box to use.
      file: The file object to print to.
    Yields:
      None.
    """
    file = file or sys.stdout
    file.write('\n')
    if name:
        header = ',--------({})--------\n'.format(name)
        footer = '`{}\n'.format('-' * (len(header)-2))
    else:
        header = ',----------------\n'
        footer = '`----------------\n'

    file.write(header)
    yield
    file.write(footer)
    file.flush()

beancount.utils.misc_utils.cmptuple (name, attributes)

Manufacture a comparable namedtuple class, similar to collections.namedtuple.

A comparable named tuple is a tuple which compares to False if contents are equal but the data types are different. We define this to supplement collections.namedtuple because by default a namedtuple disregards the type and we want to make precise comparisons for tests.

Parameters:

Name Type Description Default
name

The given name of the class.

required
attributes

A string or tuple of strings, with the names of the attributes.

required

Returns:

Type Description

A new namedtuple-derived type that compares False with other tuples with same contents.

Source code in beancount/utils/misc_utils.py
def cmptuple(name, attributes):
    """Manufacture a comparable namedtuple class, similar to collections.namedtuple.

    A comparable named tuple is a tuple which compares to False if contents are
    equal but the data types are different. We define this to supplement
    collections.namedtuple because by default a namedtuple disregards the type
    and we want to make precise comparisons for tests.

    Args:
      name: The given name of the class.
      attributes: A string or tuple of strings, with the names of the
        attributes.
    Returns:
      A new namedtuple-derived type that compares False with other
      tuples with same contents.
    """
    base = collections.namedtuple('_{}'.format(name), attributes)
    return type(name, (TypeComparable, base,), {})

beancount.utils.misc_utils.compute_unique_clean_ids (strings)

Given a sequence of strings, reduce them to corresponding ids without any funny characters and insure that the list of ids is unique. Yields pairs of (id, string) for the result.

Parameters:

Name Type Description Default
strings

A list of strings.

required

Returns:

Type Description

A list of (id, string) pairs.

Source code in beancount/utils/misc_utils.py
def compute_unique_clean_ids(strings):
    """Given a sequence of strings, reduce them to corresponding ids without any
    funny characters and insure that the list of ids is unique. Yields pairs
    of (id, string) for the result.

    Args:
      strings: A list of strings.
    Returns:
      A list of (id, string) pairs.
    """
    string_set = set(strings)

    # Try multiple methods until we get one that has no collisions.
    for regexp, replacement in [(r'[^A-Za-z0-9.-]', '_'),
                                (r'[^A-Za-z0-9_]', ''),]:
        seen = set()
        idmap = {}
        mre = re.compile(regexp)
        for string in string_set:
            id_ = mre.sub(replacement, string)
            if id_ in seen:
                break  # Collision.
            seen.add(id_)
            idmap[id_] = string
        else:
            break
    else:
        return None # Could not find a unique mapping.

    return idmap

beancount.utils.misc_utils.deprecated (message)

A decorator generator to mark functions as deprecated and log a warning.

Source code in beancount/utils/misc_utils.py
def deprecated(message):
    """A decorator generator to mark functions as deprecated and log a warning."""
    def decorator(func):
        @functools.wraps(func)
        def new_func(*args, **kwargs):
            warnings.warn("Call to deprecated function {}: {}".format(func.__name__,
                                                                      message),
                          category=DeprecationWarning,
                          stacklevel=2)
            return func(*args, **kwargs)
        return new_func
    return decorator

beancount.utils.misc_utils.dictmap (mdict, keyfun=None, valfun=None)

Map a dictionary's value.

Parameters:

Name Type Description Default
mdict

A dict.

required
key

A callable to apply to the keys.

required
value

A callable to apply to the values.

required
Source code in beancount/utils/misc_utils.py
def dictmap(mdict, keyfun=None, valfun=None):
    """Map a dictionary's value.

    Args:
      mdict: A dict.
      key: A callable to apply to the keys.
      value: A callable to apply to the values.
    """
    if keyfun is None:
        keyfun = lambda x: x
    if valfun is None:
        valfun = lambda x: x
    return {keyfun(key): valfun(val) for key, val in mdict.items()}

beancount.utils.misc_utils.escape_string (string)

Escape quotes and backslashes in payee and narration.

Parameters:

Name Type Description Default
string

Any string.

required

Returns. The input string, with offending characters replaced.

Source code in beancount/utils/misc_utils.py
def escape_string(string):
    """Escape quotes and backslashes in payee and narration.

    Args:
      string: Any string.
    Returns.
      The input string, with offending characters replaced.
    """
    return string.replace('\\', r'\\')\
                 .replace('"', r'\"')

beancount.utils.misc_utils.filter_type (elist, types)

Filter the given list to yield only instances of the given types.

Parameters:

Name Type Description Default
elist

A sequence of elements.

required
types

A sequence of types to include in the output list.

required

Yields: Each element, if it is an instance of 'types'.

Source code in beancount/utils/misc_utils.py
def filter_type(elist, types):
    """Filter the given list to yield only instances of the given types.

    Args:
      elist: A sequence of elements.
      types: A sequence of types to include in the output list.
    Yields:
      Each element, if it is an instance of 'types'.
    """
    for element in elist:
        if not isinstance(element, types):
            continue
        yield element

beancount.utils.misc_utils.first_paragraph (docstring)

Return the first sentence of a docstring. The sentence has to be delimited by an empty line.

Parameters:

Name Type Description Default
docstring

A doc string.

required

Returns:

Type Description

A string with just the first sentence on a single line.

Source code in beancount/utils/misc_utils.py
def first_paragraph(docstring):
    """Return the first sentence of a docstring.
    The sentence has to be delimited by an empty line.

    Args:
      docstring: A doc string.
    Returns:
      A string with just the first sentence on a single line.
    """
    lines = []
    for line in docstring.strip().splitlines():
        if not line:
            break
        lines.append(line.rstrip())
    return ' '.join(lines)

beancount.utils.misc_utils.get_screen_height ()

Return the height of the terminal that runs this program.

Returns:

Type Description

An integer, the number of characters the screen is high. Return 0 if the terminal cannot be initialized.

Source code in beancount/utils/misc_utils.py
def get_screen_height():
    """Return the height of the terminal that runs this program.

    Returns:
      An integer, the number of characters the screen is high.
      Return 0 if the terminal cannot be initialized.
    """
    return _get_screen_value('lines', 0)

beancount.utils.misc_utils.get_screen_width ()

Return the width of the terminal that runs this program.

Returns:

Type Description

An integer, the number of characters the screen is wide. Return 0 if the terminal cannot be initialized.

Source code in beancount/utils/misc_utils.py
def get_screen_width():
    """Return the width of the terminal that runs this program.

    Returns:
      An integer, the number of characters the screen is wide.
      Return 0 if the terminal cannot be initialized.
    """
    return _get_screen_value('cols', 0)

beancount.utils.misc_utils.get_tuple_values (ntuple, predicate, memo=None)

Return all members referred to by this namedtuple instance that satisfy the given predicate. This function also works recursively on its members which are lists or tuples, and so it can be used for Transaction instances.

Parameters:

Name Type Description Default
ntuple

A tuple or namedtuple.

required
predicate

A predicate function that returns true if an attribute is to be output.

required
memo

An optional memoizing dictionary. If a tuple has already been seen, the recursion will be avoided.

None

Yields: Attributes of the tuple and its sub-elements if the predicate is true.

Source code in beancount/utils/misc_utils.py
def get_tuple_values(ntuple, predicate, memo=None):
    """Return all members referred to by this namedtuple instance that satisfy the
    given predicate. This function also works recursively on its members which
    are lists or tuples, and so it can be used for Transaction instances.

    Args:
      ntuple: A tuple or namedtuple.
      predicate: A predicate function that returns true if an attribute is to be
        output.
      memo: An optional memoizing dictionary. If a tuple has already been seen, the
        recursion will be avoided.
    Yields:
      Attributes of the tuple and its sub-elements if the predicate is true.
    """
    if memo is None:
        memo = set()
    id_ntuple = id(ntuple)
    if id_ntuple in memo:
        return
    memo.add(id_ntuple)

    if predicate(ntuple):
        yield
    for attribute in ntuple:
        if predicate(attribute):
            yield attribute
        if isinstance(attribute, (list, tuple)):
            for value in get_tuple_values(attribute, predicate, memo):
                yield value

beancount.utils.misc_utils.groupby (keyfun, elements)

Group the elements as a dict of lists, where the key is computed using the function 'keyfun'.

Parameters:

Name Type Description Default
keyfun

A callable, used to obtain the group key from each element.

required
elements

An iterable of the elements to group.

required

Returns:

Type Description

A dict of key to list of sequences.

Source code in beancount/utils/misc_utils.py
def groupby(keyfun, elements):
    """Group the elements as a dict of lists, where the key is computed using the
    function 'keyfun'.

    Args:
      keyfun: A callable, used to obtain the group key from each element.
      elements: An iterable of the elements to group.
    Returns:
      A dict of key to list of sequences.
    """
    # Note: We could allow a custom aggregation function. Another option is
    # provide another method to reduce the list values of a dict, but that can
    # be accomplished using a dict comprehension.
    grouped = defaultdict(list)
    for element in elements:
        grouped[keyfun(element)].append(element)
    return grouped

beancount.utils.misc_utils.idify (string)

Replace characters objectionable for a filename with underscores.

Parameters:

Name Type Description Default
string

Any string.

required

Returns:

Type Description

The input string, with offending characters replaced.

Source code in beancount/utils/misc_utils.py
def idify(string):
    """Replace characters objectionable for a filename with underscores.

    Args:
      string: Any string.
    Returns:
      The input string, with offending characters replaced.
    """
    for sfrom, sto in [(r'[ \(\)]+', '_'),
                      (r'_*\._*', '.')]:
        string = re.sub(sfrom, sto, string)
    string = string.strip('_')
    return string

beancount.utils.misc_utils.import_curses ()

Try to import the 'curses' module. (This is used here in order to override for tests.)

Returns:

Type Description

The curses module, if it was possible to import it.

Exceptions:

Type Description
ImportError

If the module could not be imported.

Source code in beancount/utils/misc_utils.py
def import_curses():
    """Try to import the 'curses' module.
    (This is used here in order to override for tests.)

    Returns:
      The curses module, if it was possible to import it.
    Raises:
      ImportError: If the module could not be imported.
    """
    # Note: There's a recipe for getting terminal size on Windows here, without
    # curses, I should probably implement that at some point:
    # https://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window
    # Also, consider just using 'blessings' instead, which provides this across
    # multiple platforms.
    # pylint: disable=import-outside-toplevel
    import curses
    return curses

beancount.utils.misc_utils.is_sorted (iterable, key=<function <lambda> at 0x7efe1b3aeca0>, cmp=<function <lambda> at 0x7efe1b3aed30>)

Return true if the sequence is sorted.

Parameters:

Name Type Description Default
iterable

An iterable sequence.

required
key

A function to extract the quantity by which to sort.

<function <lambda> at 0x7efe1b3aeca0>
cmp

A function that compares two elements of a sequence.

<function <lambda> at 0x7efe1b3aed30>

Returns:

Type Description

A boolean, true if the sequence is sorted.

Source code in beancount/utils/misc_utils.py
def is_sorted(iterable, key=lambda x: x, cmp=lambda x, y: x <= y):
    """Return true if the sequence is sorted.

    Args:
      iterable: An iterable sequence.
      key: A function to extract the quantity by which to sort.
      cmp: A function that compares two elements of a sequence.
    Returns:
      A boolean, true if the sequence is sorted.
    """
    iterator = map(key, iterable)
    prev = next(iterator)
    for element in iterator:
        if not cmp(prev, element):
            return False
        prev = element
    return True

beancount.utils.misc_utils.log_time (operation_name, log_timings, indent=0)

A context manager that times the block and logs it to info level.

Parameters:

Name Type Description Default
operation_name

A string, a label for the name of the operation.

required
log_timings

A function to write log messages to. If left to None, no timings are written (this becomes a no-op).

required
indent

An integer, the indentation level for the format of the timing line. This is useful if you're logging timing to a hierarchy of operations.

0

Yields: The start time of the operation.

Source code in beancount/utils/misc_utils.py
@contextlib.contextmanager
def log_time(operation_name, log_timings, indent=0):
    """A context manager that times the block and logs it to info level.

    Args:
      operation_name: A string, a label for the name of the operation.
      log_timings: A function to write log messages to. If left to None,
        no timings are written (this becomes a no-op).
      indent: An integer, the indentation level for the format of the timing
        line. This is useful if you're logging timing to a hierarchy of
        operations.
    Yields:
      The start time of the operation.
    """
    time1 = time()
    yield time1
    time2 = time()
    if log_timings:
        log_timings("Operation: {:48} Time: {}{:6.0f} ms".format(
            "'{}'".format(operation_name), '      '*indent, (time2 - time1) * 1000))

beancount.utils.misc_utils.longest (seq)

Return the longest of the given subsequences.

Parameters:

Name Type Description Default
seq

An iterable sequence of lists.

required

Returns:

Type Description

The longest list from the sequence.

Source code in beancount/utils/misc_utils.py
def longest(seq):
    """Return the longest of the given subsequences.

    Args:
      seq: An iterable sequence of lists.
    Returns:
      The longest list from the sequence.
    """
    longest, length = None, -1
    for element in seq:
        len_element = len(element)
        if len_element > length:
            longest, length = element, len_element
    return longest

beancount.utils.misc_utils.map_namedtuple_attributes (attributes, mapper, object_)

Map the value of the named attributes of object by mapper.

Parameters:

Name Type Description Default
attributes

A sequence of string, the attribute names to map.

required
mapper

A callable that accepts the value of a field and returns the new value.

required
object_

Some namedtuple object with attributes on it.

required

Returns:

Type Description

A new instance of the same namedtuple with the named fields mapped by mapper.

Source code in beancount/utils/misc_utils.py
def map_namedtuple_attributes(attributes, mapper, object_):
    """Map the value of the named attributes of object by mapper.

    Args:
      attributes: A sequence of string, the attribute names to map.
      mapper: A callable that accepts the value of a field and returns
        the new value.
      object_: Some namedtuple object with attributes on it.
    Returns:
      A new instance of the same namedtuple with the named fields mapped by
      mapper.
    """
    return object_._replace(**{attribute: mapper(getattr(object_, attribute))
                               for attribute in attributes})

beancount.utils.misc_utils.replace_namedtuple_values (ntuple, predicate, mapper, memo=None)

Recurse through all the members of namedtuples and lists, and for members that match the given predicate, run them through the given mapper.

Parameters:

Name Type Description Default
ntuple

A namedtuple instance.

required
predicate

A predicate function that returns true if an attribute is to be output.

required
mapper

A callable, that will accept a single argument and return its replacement value.

required
memo

An optional memoizing dictionary. If a tuple has already been seen, the recursion will be avoided.

None

Yields: Attributes of the tuple and its sub-elements if the predicate is true.

Source code in beancount/utils/misc_utils.py
def replace_namedtuple_values(ntuple, predicate, mapper, memo=None):
    """Recurse through all the members of namedtuples and lists, and for
    members that match the given predicate, run them through the given mapper.

    Args:
      ntuple: A namedtuple instance.
      predicate: A predicate function that returns true if an attribute is to be
        output.
      mapper: A callable, that will accept a single argument and return its
        replacement value.
      memo: An optional memoizing dictionary. If a tuple has already been seen, the
        recursion will be avoided.
    Yields:
      Attributes of the tuple and its sub-elements if the predicate is true.
    """
    if memo is None:
        memo = set()
    id_ntuple = id(ntuple)
    if id_ntuple in memo:
        return None
    memo.add(id_ntuple)

    # pylint: disable=unidiomatic-typecheck
    if not (type(ntuple) is not tuple and isinstance(ntuple, tuple)):
        return ntuple
    replacements = {}
    for attribute_name, attribute in zip(ntuple._fields, ntuple):
        if predicate(attribute):
            replacements[attribute_name] = mapper(attribute)
        elif type(attribute) is not tuple and isinstance(attribute, tuple):
            replacements[attribute_name] = replace_namedtuple_values(
                attribute, predicate, mapper, memo)
        elif type(attribute) in (list, tuple):
            replacements[attribute_name] = [
                replace_namedtuple_values(member, predicate, mapper, memo)
                for member in attribute]
    return ntuple._replace(**replacements)

beancount.utils.misc_utils.skipiter (iterable, num_skip)

Skip some elements from an iterator.

Parameters:

Name Type Description Default
iterable

An iterator.

required
num_skip

The number of elements in the period.

required

Yields: Elements from the iterable, with num_skip elements skipped. For example, skipiter(range(10), 3) yields [0, 3, 6, 9].

Source code in beancount/utils/misc_utils.py
def skipiter(iterable, num_skip):
    """Skip some elements from an iterator.

    Args:
      iterable: An iterator.
      num_skip: The number of elements in the period.
    Yields:
      Elements from the iterable, with num_skip elements skipped.
      For example, skipiter(range(10), 3) yields [0, 3, 6, 9].
    """
    assert num_skip > 0
    sit = iter(iterable)
    while 1:
        try:
            value = next(sit)
        except StopIteration:
            return
        yield value
        for _ in range(num_skip-1):
            try:
                next(sit)
            except StopIteration:
                return

beancount.utils.misc_utils.sorted_uniquify (iterable, keyfunc=None, last=False)

Given a sequence of elements, sort and remove duplicates of the given key. Keep either the first or the last (by key) element of a sequence of key-identical elements. This does not maintain the ordering of the original elements, they are returned sorted (by key) instead.

Parameters:

Name Type Description Default
iterable

An iterable sequence.

required
keyfunc

A function that extracts from the elements the sort key to use and uniquify on. If left unspecified, the identify function is used and the uniquification occurs on the elements themselves.

None
last

A boolean, True if we should keep the last item of the same keys. Otherwise keep the first.

False

Yields: Elements from the iterable.

Source code in beancount/utils/misc_utils.py
def sorted_uniquify(iterable, keyfunc=None, last=False):
    """Given a sequence of elements, sort and remove duplicates of the given key.
    Keep either the first or the last (by key) element of a sequence of
    key-identical elements. This does _not_ maintain the ordering of the
    original elements, they are returned sorted (by key) instead.

    Args:
      iterable: An iterable sequence.
      keyfunc: A function that extracts from the elements the sort key
        to use and uniquify on. If left unspecified, the identify function
        is used and the uniquification occurs on the elements themselves.
      last: A boolean, True if we should keep the last item of the same keys.
        Otherwise keep the first.
    Yields:
      Elements from the iterable.
    """
    if keyfunc is None:
        keyfunc = lambda x: x
    if last:
        prev_obj = UNSET
        prev_key = UNSET
        for obj in sorted(iterable, key=keyfunc):
            key = keyfunc(obj)
            if key != prev_key and prev_obj is not UNSET:
                yield prev_obj
            prev_obj = obj
            prev_key = key
        if prev_obj is not UNSET:
            yield prev_obj
    else:
        prev_key = UNSET
        for obj in sorted(iterable, key=keyfunc):
            key = keyfunc(obj)
            if key != prev_key:
                yield obj
                prev_key = key

beancount.utils.misc_utils.staticvar (varname, initial_value)

Returns a decorator that defines a Python function attribute.

This is used to simulate a static function variable in Python.

Parameters:

Name Type Description Default
varname

A string, the name of the variable to define.

required
initial_value

The value to initialize the variable to.

required

Returns:

Type Description

A function decorator.

Source code in beancount/utils/misc_utils.py
def staticvar(varname, initial_value):
    """Returns a decorator that defines a Python function attribute.

    This is used to simulate a static function variable in Python.

    Args:
      varname: A string, the name of the variable to define.
      initial_value: The value to initialize the variable to.
    Returns:
      A function decorator.
    """
    def deco(fun):
        setattr(fun, varname, initial_value)
        return fun
    return deco

beancount.utils.misc_utils.swallow (* exception_types)

Catch and ignore certain exceptions.

Parameters:

Name Type Description Default
exception_types

A tuple of exception classes to ignore.

()

Yields: None.

Source code in beancount/utils/misc_utils.py
@contextlib.contextmanager
def swallow(*exception_types):
    """Catch and ignore certain exceptions.

    Args:
      exception_types: A tuple of exception classes to ignore.
    Yields:
      None.
    """
    try:
        yield
    except Exception as exc:
        if not isinstance(exc, exception_types):
            raise

beancount.utils.misc_utils.uniquify (iterable, keyfunc=None, last=False)

Given a sequence of elements, remove duplicates of the given key. Keep either the first or the last element of a sequence of key-identical elements. Order is maintained as much as possible. This does maintain the ordering of the original elements, they are returned in the same order as the original elements.

Parameters:

Name Type Description Default
iterable

An iterable sequence.

required
keyfunc

A function that extracts from the elements the sort key to use and uniquify on. If left unspecified, the identify function is used and the uniquification occurs on the elements themselves.

None
last

A boolean, True if we should keep the last item of the same keys. Otherwise keep the first.

False

Yields: Elements from the iterable.

Source code in beancount/utils/misc_utils.py
def uniquify(iterable, keyfunc=None, last=False):
    """Given a sequence of elements, remove duplicates of the given key. Keep either
    the first or the last element of a sequence of key-identical elements. Order
    is maintained as much as possible. This does maintain the ordering of the
    original elements, they are returned in the same order as the original
    elements.

    Args:
      iterable: An iterable sequence.
      keyfunc: A function that extracts from the elements the sort key
        to use and uniquify on. If left unspecified, the identify function
        is used and the uniquification occurs on the elements themselves.
      last: A boolean, True if we should keep the last item of the same keys.
        Otherwise keep the first.
    Yields:
      Elements from the iterable.
    """
    if keyfunc is None:
        keyfunc = lambda x: x
    seen = set()
    if last:
        unique_reversed_list = []
        for obj in reversed(iterable):
            key = keyfunc(obj)
            if key not in seen:
                seen.add(key)
                unique_reversed_list.append(obj)
        yield from reversed(unique_reversed_list)
    else:
        for obj in iterable:
            key = keyfunc(obj)
            if key not in seen:
                seen.add(key)
                yield obj

beancount.utils.net_utils

Network utilities.

beancount.utils.net_utils.retrying_urlopen (url, timeout=5, max_retry=5)

Open and download the given URL, retrying if it times out.

Parameters:

Name Type Description Default
url

A string, the URL to fetch.

required
timeout

A timeout after which to stop waiting for a response and return an error.

5
max_retry

The maximum number of times to retry.

5

Returns:

Type Description

The contents of the fetched URL.

Source code in beancount/utils/net_utils.py
def retrying_urlopen(url, timeout=5, max_retry=5):
    """Open and download the given URL, retrying if it times out.

    Args:
      url: A string, the URL to fetch.
      timeout: A timeout after which to stop waiting for a response and return an
        error.
      max_retry: The maximum number of times to retry.
    Returns:
      The contents of the fetched URL.
    """
    for _ in range(max_retry):
        logging.debug("Reading %s", url)
        try:
            response = request.urlopen(url, timeout=timeout)
            if response:
                break
        except error.URLError:
            return None
    if response and response.getcode() != 200:
        return None
    return response

beancount.utils.pager

Code to write output to a pager.

This module contains an object accumulates lines up to a minimum and then decides whether to flush them to the original output directly if under the threshold (no pager) or creates a pager and flushes the lines to it if above the threshold and then forwards all future lines to it. The purpose of this object is to pipe output to a pager only if the number of lines to be printed exceeds a minimum number of lines.

The contextmanager is intended to be used to pipe output to a pager and wait on the pager to complete before continuing. Simply write to the file object and upon exit we close the file object. This also silences broken pipe errors triggered by the user exiting the sub-process, and recovers from a failing pager command by just using stdout.

beancount.utils.pager.ConditionalPager

A proxy file for a pager that only creates a pager after a minimum number of lines has been printed to it.

beancount.utils.pager.ConditionalPager.__enter__ (self) special

Initialize the context manager and return this instance as it.

Source code in beancount/utils/pager.py
def __enter__(self):
    """Initialize the context manager and return this instance as it."""

    # The file and pipe object we're writing to. This gets set after the
    # number of accumulated lines reaches the threshold.
    if self.minlines:
        self.file = None
        self.pipe = None
    else:
        self.file, self.pipe = create_pager(self.command, self.default_file)

    # Lines accumulated before the threshold.
    self.accumulated_data = []
    self.accumulated_lines = 0

    # Return this object to be used as the context manager itself.
    return self

beancount.utils.pager.ConditionalPager.__exit__ (self, type, value, unused_traceback) special

Context manager exit. This flushes the output to our output file.

Parameters:

Name Type Description Default
type

Optional exception type, as per context managers.

required
value

Optional exception value, as per context managers.

required
unused_traceback

Optional trace.

required
Source code in beancount/utils/pager.py
def __exit__(self, type, value, unused_traceback):
    """Context manager exit. This flushes the output to our output file.

    Args:
      type: Optional exception type, as per context managers.
      value: Optional exception value, as per context managers.
      unused_traceback: Optional trace.
    """
    try:
        if self.file:
            # Flush the output file and close it.
            self.file.flush()
        else:
            # Oops... we never reached the threshold. Flush the accumulated
            # output to the file.
            self.flush_accumulated(self.default_file)

        # Wait for the subprocess (if we have one).
        if self.pipe:
            self.file.close()
            self.pipe.wait()

    # Absorb broken pipes that may occur on flush or close above.
    except BrokenPipeError:
        return True

    # Absorb broken pipes.
    if isinstance(value, BrokenPipeError):
        return True
    elif value:
        raise

beancount.utils.pager.ConditionalPager.__init__ (self, command, minlines=None) special

Create a conditional pager.

Parameters:

Name Type Description Default
command

A string, the shell command to run as a pager.

required
minlines

If set, the number of lines under which you should not bother starting a pager. This avoids kicking off a pager if the screen is high enough to render the contents. If the value is unset, always starts a pager (which is fine behavior too).

None
Source code in beancount/utils/pager.py
def __init__(self, command, minlines=None):
    """Create a conditional pager.

    Args:
      command: A string, the shell command to run as a pager.
      minlines: If set, the number of lines under which you should not bother starting
        a pager. This avoids kicking off a pager if the screen is high enough to
        render the contents. If the value is unset, always starts a pager (which is
        fine behavior too).
    """
    self.command = command
    self.minlines = minlines
    self.default_file = (codecs.getwriter("utf-8")(sys.stdout.buffer)
                         if hasattr(sys.stdout, 'buffer') else
                         sys.stdout)

beancount.utils.pager.ConditionalPager.flush_accumulated (self, file)

Flush the existing lines to the newly created pager. This also disabled the accumulator.

Parameters:

Name Type Description Default
file

A file object to flush the accumulated data to.

required
Source code in beancount/utils/pager.py
def flush_accumulated(self, file):
    """Flush the existing lines to the newly created pager.
    This also disabled the accumulator.

    Args:
      file: A file object to flush the accumulated data to.
    """
    if self.accumulated_data:
        write = file.write
        for data in self.accumulated_data:
            write(data)
    self.accumulated_data = None
    self.accumulated_lines = None

beancount.utils.pager.ConditionalPager.write (self, data)

Write the data out. Overridden from the file object interface.

Parameters:

Name Type Description Default
data

A string, data to write to the output.

required
Source code in beancount/utils/pager.py
def write(self, data):
    """Write the data out. Overridden from the file object interface.

    Args:
      data: A string, data to write to the output.
    """
    if self.file is None:
        # Accumulate the new lines.
        self.accumulated_lines += data.count('\n')
        self.accumulated_data.append(data)

        # If we've reached the threshold, create a file.
        if self.accumulated_lines > self.minlines:
            self.file, self.pipe = create_pager(self.command, self.default_file)
            self.flush_accumulated(self.file)
    else:
        # We've already created a pager subprocess... flush the lines to it.
        self.file.write(data)
        # try:
        # except BrokenPipeError:
        #     # Make sure we don't barf on __exit__().
        #     self.file = self.pipe = None
        #     raise

beancount.utils.pager.create_pager (command, file)

Try to create and return a pager subprocess.

Parameters:

Name Type Description Default
command

A string, the shell command to run as a pager.

required
file

The file object for the pager write to. This is also used as a default if we failed to create the pager subprocess.

required

Returns:

Type Description

A pair of (file, pipe), a file object and an optional subprocess.Popen instance to wait on. The pipe instance may be set to None if we failed to create a subprocess.

Source code in beancount/utils/pager.py
def create_pager(command, file):
    """Try to create and return a pager subprocess.

    Args:
      command: A string, the shell command to run as a pager.
      file: The file object for the pager write to. This is also used as a
        default if we failed to create the pager subprocess.
    Returns:
      A pair of (file, pipe), a file object and an optional subprocess.Popen instance
      to wait on. The pipe instance may be set to None if we failed to create a subprocess.
    """

    if command is None:
        command = os.environ.get('PAGER', DEFAULT_PAGER)
    if not command:
        command = DEFAULT_PAGER

    pipe = None

    # In case of using 'less', make sure the charset is set properly. In theory
    # you could override this by setting PAGER to "LESSCHARSET=utf-8 less" but
    # this shouldn't affect other programs and is unlikely to cause problems, so
    # we set it here to make default behavior work for most people (we always
    # write UTF-8).
    env = os.environ.copy()
    env['LESSCHARSET'] = "utf-8"

    try:
        pipe = subprocess.Popen(command, shell=True,
                                stdin=subprocess.PIPE,
                                stdout=file,
                                env=env)
    except OSError as exc:
        logging.error("Invalid pager: {}".format(exc))
    else:
        stdin_wrapper = io.TextIOWrapper(pipe.stdin, 'utf-8')
        file = stdin_wrapper
    return file, pipe

beancount.utils.pager.flush_only (fileobj)

A contextmanager around a file object that does not close the file.

This is used to return a context manager on a file object but not close it. We flush it instead. This is useful in order to provide an alternative to a pager class as above.

Parameters:

Name Type Description Default
fileobj

A file object, to remain open after running the context manager.

required

Yields: A context manager that yields this object.

Source code in beancount/utils/pager.py
@contextlib.contextmanager
def flush_only(fileobj):
    """A contextmanager around a file object that does not close the file.

    This is used to return a context manager on a file object but not close it.
    We flush it instead. This is useful in order to provide an alternative to a
    pager class as above.

    Args:
      fileobj: A file object, to remain open after running the context manager.
    Yields:
      A context manager that yields this object.
    """
    try:
        yield fileobj
    finally:
        fileobj.flush()

beancount.utils.regexp_utils

Regular expression helpers.

beancount.utils.regexp_utils.re_replace_unicode (regexp)

Substitute Unicode Properties in regular expressions with their corresponding expanded ranges.

Parameters:

Name Type Description Default
regexp

A regular expression string.

required

Returns:

Type Description

Return the regular expression with Unicode Properties substituted.

Source code in beancount/utils/regexp_utils.py
def re_replace_unicode(regexp):
    """Substitute Unicode Properties in regular expressions with their
    corresponding expanded ranges.

    Args:
      regexp: A regular expression string.
    Returns:
      Return the regular expression with Unicode Properties substituted.
    """
    for category, rangestr in UNICODE_RANGES.items():
        regexp = regexp.replace(r'\p{' + category + '}', rangestr)
    return regexp

beancount.utils.snoop

Text manipulation utilities.

beancount.utils.snoop.Snoop

A snooper callable that just saves the returned values of a function. This is particularly useful for re.match and re.search in conditionals, e.g.::

  snoop = Snoop()
  ...
  if snoop(re.match(r"(\d+)-(\d+)-(\d+)", text)):
    year, month, date = snoop.value.group(1, 2, 3)

Attributes:
  value: The last value snooped from a function call.
  history: If 'maxlen' was specified, the last few values
    that were snooped.

beancount.utils.snoop.Snoop.__call__ (self, value) special

Save a value to the snooper. This is meant to wrap a function call.

Parameters:

Name Type Description Default
value

The value to push/save.

required

Returns:

Type Description

Value itself.

Source code in beancount/utils/snoop.py
def __call__(self, value):
    """Save a value to the snooper. This is meant to wrap
    a function call.

    Args:
      value: The value to push/save.
    Returns:
      Value itself.
    """
    self.value = value
    if self.history is not None:
        self.history.append(value)
    return value

beancount.utils.snoop.Snoop.__getattr__ (self, attr) special

Forward the attribute to the value.

Parameters:

Name Type Description Default
attr

A string, the name of the attribute.

required

Returns:

Type Description

The value of the attribute.

Source code in beancount/utils/snoop.py
def __getattr__(self, attr):
    """Forward the attribute to the value.

    Args:
      attr: A string, the name of the attribute.
    Returns:
      The value of the attribute.
    """
    return getattr(self.value, attr)

beancount.utils.snoop.Snoop.__init__ (self, maxlen=None) special

Create a new snooper.

Parameters:

Name Type Description Default
maxlen

If specified, an integer, which enables the saving of that

None
Source code in beancount/utils/snoop.py
def __init__(self, maxlen=None):
    """Create a new snooper.

    Args:
      maxlen: If specified, an integer, which enables the saving of that
      number of last values in the history attribute.
    """
    self.value = None
    self.history = (collections.deque(maxlen=maxlen)
                    if maxlen
                    else None)

beancount.utils.snoop.snoopify (function)

Decorate a function as snoopable.

This is meant to reassign existing functions to a snoopable version of them. For example, if you wanted 're.match' to be automatically snoopable, just decorate it like this:

re.match = snoopify(re.match)

and then you can just call 're.match' in a conditional and then access 're.match.value' to get to the last returned value.

Source code in beancount/utils/snoop.py
def snoopify(function):
    """Decorate a function as snoopable.

    This is meant to reassign existing functions to a snoopable version of them.
    For example, if you wanted 're.match' to be automatically snoopable, just
    decorate it like this:

      re.match = snoopify(re.match)

    and then you can just call 're.match' in a conditional and then access
    're.match.value' to get to the last returned value.
    """
    @functools.wraps(function)
    def wrapper(*args, **kw):
        value = function(*args, **kw)
        wrapper.value = value
        return value
    wrapper.value = None
    return wrapper

beancount.utils.test_utils

Support utilities for testing scripts.

beancount.utils.test_utils.RCall

RCall(args, kwargs, return_value)

beancount.utils.test_utils.RCall.__getnewargs__ (self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in beancount/utils/test_utils.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.utils.test_utils.RCall.__new__ (_cls, args, kwargs, return_value) special staticmethod

Create new instance of RCall(args, kwargs, return_value)

beancount.utils.test_utils.RCall.__repr__ (self) special

Return a nicely formatted representation string

Source code in beancount/utils/test_utils.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.utils.test_utils.TestCase

beancount.utils.test_utils.TestCase.assertLines (self, text1, text2, message=None)

Compare the lines of text1 and text2, ignoring whitespace.

Parameters:

Name Type Description Default
text1

A string, the expected text.

required
text2

A string, the actual text.

required
message

An optional string message in case the assertion fails.

None

Exceptions:

Type Description
AssertionError

If the exception fails.

Source code in beancount/utils/test_utils.py
def assertLines(self, text1, text2, message=None):
    """Compare the lines of text1 and text2, ignoring whitespace.

    Args:
      text1: A string, the expected text.
      text2: A string, the actual text.
      message: An optional string message in case the assertion fails.
    Raises:
      AssertionError: If the exception fails.
    """
    clean_text1 = textwrap.dedent(text1.strip())
    clean_text2 = textwrap.dedent(text2.strip())
    lines1 = [line.strip() for line in clean_text1.splitlines()]
    lines2 = [line.strip() for line in clean_text2.splitlines()]

    # Compress all space longer than 4 spaces to exactly 4.
    # This affords us to be even looser.
    lines1 = [re.sub('    [ \t]*', '    ', line) for line in lines1]
    lines2 = [re.sub('    [ \t]*', '    ', line) for line in lines2]
    self.assertEqual(lines1, lines2, message)

beancount.utils.test_utils.TestCase.assertOutput (self, expected_text)

Expect text printed to stdout.

Parameters:

Name Type Description Default
expected_text

A string, the text that should have been printed to stdout.

required

Exceptions:

Type Description
AssertionError

If the text differs.

Source code in beancount/utils/test_utils.py
@contextlib.contextmanager
def assertOutput(self, expected_text):
    """Expect text printed to stdout.

    Args:
      expected_text: A string, the text that should have been printed to stdout.
    Raises:
      AssertionError: If the text differs.
    """
    with capture() as oss:
        yield oss
    self.assertLines(textwrap.dedent(expected_text), oss.getvalue())

beancount.utils.test_utils.call_command (command)

Run the script with a subprocess.

Parameters:

Name Type Description Default
script_args

A list of strings, the arguments to the subprocess, including the script name.

required

Returns:

Type Description

A triplet of (return code integer, stdout ext, stderr text).

Source code in beancount/utils/test_utils.py
def call_command(command):
    """Run the script with a subprocess.

    Args:
      script_args: A list of strings, the arguments to the subprocess,
        including the script name.
    Returns:
      A triplet of (return code integer, stdout ext, stderr text).
    """
    assert isinstance(command, list), command
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    stdout, stderr = p.communicate()
    return p.returncode, stdout.decode(), stderr.decode()

beancount.utils.test_utils.capture (* attributes)

A context manager that captures what's printed to stdout.

Parameters:

Name Type Description Default
*attributes

A tuple of strings, the name of the sys attributes to override with StringIO instances.

()

Yields: A StringIO string accumulator.

Source code in beancount/utils/test_utils.py
def capture(*attributes):
    """A context manager that captures what's printed to stdout.

    Args:
      *attributes: A tuple of strings, the name of the sys attributes to override
        with StringIO instances.
    Yields:
      A StringIO string accumulator.
    """
    if not attributes:
        attributes = 'stdout'
    elif len(attributes) == 1:
        attributes = attributes[0]
    return patch(sys, attributes, io.StringIO)

beancount.utils.test_utils.create_temporary_files (root, contents_map)

Create a number of temporary files under 'root'.

This routine is used to initialize the contents of multiple files under a temporary directory.

Parameters:

Name Type Description Default
root

A string, the name of the directory under which to create the files.

required
contents_map

A dict of relative filenames to their contents. The content strings will be automatically dedented for convenience. In addition, the string 'ROOT' in the contents will be automatically replaced by the root directory name.

required
Source code in beancount/utils/test_utils.py
def create_temporary_files(root, contents_map):
    """Create a number of temporary files under 'root'.

    This routine is used to initialize the contents of multiple files under a
    temporary directory.

    Args:
      root: A string, the name of the directory under which to create the files.
      contents_map: A dict of relative filenames to their contents. The content
        strings will be automatically dedented for convenience. In addition, the
        string 'ROOT' in the contents will be automatically replaced by the root
        directory name.
    """
    os.makedirs(root, exist_ok=True)
    for relative_filename, contents in contents_map.items():
        assert not path.isabs(relative_filename)
        filename = path.join(root, relative_filename)
        os.makedirs(path.dirname(filename), exist_ok=True)

        clean_contents = textwrap.dedent(contents.replace('{root}', root))
        with open(filename, 'w') as f:
            f.write(clean_contents)

beancount.utils.test_utils.docfile (function, ** kwargs)

A decorator that write the function's docstring to a temporary file and calls the decorated function with the temporary filename. This is useful for writing tests.

Parameters:

Name Type Description Default
function

A function to decorate.

required

Returns:

Type Description

The decorated function.

Source code in beancount/utils/test_utils.py
def docfile(function, **kwargs):
    """A decorator that write the function's docstring to a temporary file
    and calls the decorated function with the temporary filename.  This is
    useful for writing tests.

    Args:
      function: A function to decorate.
    Returns:
      The decorated function.
    """
    @functools.wraps(function)
    def new_function(self):
        allowed = ('buffering', 'encoding', 'newline', 'dir', 'prefix', 'suffix')
        if any([key not in allowed for key in kwargs]):
            raise ValueError("Invalid kwarg to docfile_extra")
        with tempfile.NamedTemporaryFile('w', **kwargs) as file:
            text = function.__doc__
            file.write(textwrap.dedent(text))
            file.flush()
            return function(self, file.name)
    new_function.__doc__ = None
    return new_function

beancount.utils.test_utils.docfile_extra (** kwargs)

A decorator identical to @docfile, but it also takes kwargs for the temporary file, Kwargs: e.g. buffering, encoding, newline, dir, prefix, and suffix.

Returns:

Type Description

docfile

Source code in beancount/utils/test_utils.py
def docfile_extra(**kwargs):
    """
    A decorator identical to @docfile,
    but it also takes kwargs for the temporary file,
    Kwargs:
      e.g. buffering, encoding, newline, dir, prefix, and suffix.
    Returns:
      docfile
    """
    return functools.partial(docfile, **kwargs)

beancount.utils.test_utils.environ (varname, newvalue)

A context manager which pushes varname's value and restores it later.

Parameters:

Name Type Description Default
varname

A string, the environ variable name.

required
newvalue

A string, the desired value.

required
Source code in beancount/utils/test_utils.py
@contextlib.contextmanager
def environ(varname, newvalue):
    """A context manager which pushes varname's value and restores it later.

    Args:
      varname: A string, the environ variable name.
      newvalue: A string, the desired value.
    """
    oldvalue = os.environ.get(varname, None)
    os.environ[varname] = newvalue
    yield
    if oldvalue is not None:
        os.environ[varname] = oldvalue
    else:
        del os.environ[varname]

beancount.utils.test_utils.find_python_lib ()

Return the path to the root of the Python libraries.

Returns:

Type Description

A string, the root directory.

Source code in beancount/utils/test_utils.py
def find_python_lib():
    """Return the path to the root of the Python libraries.

    Returns:
      A string, the root directory.
    """
    return path.dirname(path.dirname(path.dirname(__file__)))

beancount.utils.test_utils.find_repository_root (filename=None)

Return the path to the repository root.

Parameters:

Name Type Description Default
filename

A string, the name of a file within the repository.

None

Returns:

Type Description

A string, the root directory.

Source code in beancount/utils/test_utils.py
def find_repository_root(filename=None):
    """Return the path to the repository root.

    Args:
      filename: A string, the name of a file within the repository.
    Returns:
      A string, the root directory.
    """
    if filename is None:
        filename = __file__

    # Support root directory under Bazel.
    match = re.match(r"(.*\.runfiles/beancount)/", filename)
    if match:
        return match.group(1)

    while not all(path.exists(path.join(filename, sigfile))
                  for sigfile in ('PKG-INFO', 'COPYING', 'README.rst')):
        prev_filename = filename
        filename = path.dirname(filename)
        if prev_filename == filename:
            raise ValueError("Failed to find the root directory.")
    return filename

beancount.utils.test_utils.make_failing_importer (* removed_module_names)

Make an importer that raise an ImportError for some modules.

Use it like this:

@mock.patch('builtins.import', make_failing_importer('setuptools')) def test_...

Parameters:

Name Type Description Default
removed_module_name

The name of the module import that should raise an exception.

required

Returns:

Type Description

A decorated test decorator.

Source code in beancount/utils/test_utils.py
def make_failing_importer(*removed_module_names):
    """Make an importer that raise an ImportError for some modules.

    Use it like this:

      @mock.patch('builtins.__import__', make_failing_importer('setuptools'))
      def test_...

    Args:
      removed_module_name: The name of the module import that should raise an exception.
    Returns:
      A decorated test decorator.
    """
    def failing_import(name, *args, **kwargs):
        if name in removed_module_names:
            raise ImportError("Could not import {}".format(name))
        return builtins.__import__(name, *args, **kwargs)
    return failing_import

beancount.utils.test_utils.nottest (func)

Make the given function not testable.

Source code in beancount/utils/test_utils.py
def nottest(func):
    "Make the given function not testable."
    func.__test__ = False
    return func

beancount.utils.test_utils.patch (obj, attributes, replacement_type)

A context manager that temporarily patches an object's attributes.

All attributes in 'attributes' are saved and replaced by new instances of type 'replacement_type'.

Parameters:

Name Type Description Default
obj

The object to patch up.

required
attributes

A string or a sequence of strings, the names of attributes to replace.

required
replacement_type

A callable to build replacement objects.

required

Yields: An instance of a list of sequences of 'replacement_type'.

Source code in beancount/utils/test_utils.py
@contextlib.contextmanager
def patch(obj, attributes, replacement_type):
    """A context manager that temporarily patches an object's attributes.

    All attributes in 'attributes' are saved and replaced by new instances
    of type 'replacement_type'.

    Args:
      obj: The object to patch up.
      attributes: A string or a sequence of strings, the names of attributes to replace.
      replacement_type: A callable to build replacement objects.
    Yields:
      An instance of a list of sequences of 'replacement_type'.
    """
    single = isinstance(attributes, str)
    if single:
        attributes = [attributes]

    saved = []
    replacements = []
    for attribute in attributes:
        replacement = replacement_type()
        replacements.append(replacement)
        saved.append(getattr(obj, attribute))
        setattr(obj, attribute, replacement)

    yield replacements[0] if single else replacements

    for attribute, saved_attr in zip(attributes, saved):
        setattr(obj, attribute, saved_attr)

beancount.utils.test_utils.record (fun)

Decorates the function to intercept and record all calls and return values.

Parameters:

Name Type Description Default
fun

A callable to be decorated.

required

Returns:

Type Description

A wrapper function with a .calls attribute, a list of RCall instances.

Source code in beancount/utils/test_utils.py
def record(fun):
    """Decorates the function to intercept and record all calls and return values.

    Args:
      fun: A callable to be decorated.
    Returns:
      A wrapper function with a .calls attribute, a list of RCall instances.
    """
    @functools.wraps(fun)
    def wrapped(*args, **kw):
        return_value = fun(*args, **kw)
        wrapped.calls.append(RCall(args, kw, return_value))
        return return_value
    wrapped.calls = []
    return wrapped

beancount.utils.test_utils.run_with_args (function, args)

Run the given function with sys.argv set to argv. The first argument is automatically inferred to be where the function object was defined. sys.argv is restored after the function is called.

Parameters:

Name Type Description Default
function

A function object to call with no arguments.

required
argv

A list of arguments, excluding the script name, to be temporarily set on sys.argv.

required

Returns:

Type Description

The return value of the function run.

Source code in beancount/utils/test_utils.py
def run_with_args(function, args):
    """Run the given function with sys.argv set to argv. The first argument is
    automatically inferred to be where the function object was defined. sys.argv
    is restored after the function is called.

    Args:
      function: A function object to call with no arguments.
      argv: A list of arguments, excluding the script name, to be temporarily
        set on sys.argv.
    Returns:
      The return value of the function run.
    """
    saved_argv = sys.argv
    saved_handlers = logging.root.handlers
    try:
        module = sys.modules[function.__module__]
        sys.argv = [module.__file__] + args
        logging.root.handlers = []
        return function()
    finally:
        sys.argv = saved_argv
        logging.root.handlers = saved_handlers

beancount.utils.test_utils.search_words (words, line)

Search for a sequence of words in a line.

Parameters:

Name Type Description Default
words

A list of strings, the words to look for, or a space-separated string.

required
line

A string, the line to search into.

required

Returns:

Type Description

A MatchObject, or None.

Source code in beancount/utils/test_utils.py
def search_words(words, line):
    """Search for a sequence of words in a line.

    Args:
      words: A list of strings, the words to look for, or a space-separated string.
      line: A string, the line to search into.
    Returns:
      A MatchObject, or None.
    """
    if isinstance(words, str):
        words = words.split()
    return re.search('.*'.join(r'\b{}\b'.format(word) for word in words), line)

beancount.utils.test_utils.skipIfRaises (* exc_types)

A context manager (or decorator) that skips a test if an exception is raised.

Yields: Nothing, for you to execute the function code.

Exceptions:

Type Description
SkipTest

if the test raised the expected exception.

Source code in beancount/utils/test_utils.py
@contextlib.contextmanager
def skipIfRaises(*exc_types):
    """A context manager (or decorator) that skips a test if an exception is raised.

    Args:
      exc_type
    Yields:
      Nothing, for you to execute the function code.
    Raises:
      SkipTest: if the test raised the expected exception.
    """
    try:
        yield
    except exc_types as exception:
        raise unittest.SkipTest(exception)

beancount.utils.test_utils.subprocess_env ()

Return a dict to use as environment for running subprocesses.

Returns:

Type Description

A string, the root directory.

Source code in beancount/utils/test_utils.py
def subprocess_env():
    """Return a dict to use as environment for running subprocesses.

    Returns:
      A string, the root directory.
    """
    # Ensure we have locations to invoke our Python executable and our
    # runnable binaries in the test environment to run subprocesses.
    binpath = ':'.join([
        path.dirname(sys.executable),
        path.join(find_repository_root(__file__), 'bin'),
        os.environ.get('PATH', '').strip(':')]).strip(':')
    return {'PATH': binpath,
            'PYTHONPATH': find_python_lib()}

beancount.utils.test_utils.tempdir (delete=True, ** kw)

A context manager that creates a temporary directory and deletes its contents unconditionally once done.

Parameters:

Name Type Description Default
delete

A boolean, true if we want to delete the directory after running.

True
**kw

Keyword arguments for mkdtemp.

{}

Yields: A string, the name of the temporary directory created.

Source code in beancount/utils/test_utils.py
@contextlib.contextmanager
def tempdir(delete=True, **kw):
    """A context manager that creates a temporary directory and deletes its
    contents unconditionally once done.

    Args:
      delete: A boolean, true if we want to delete the directory after running.
      **kw: Keyword arguments for mkdtemp.
    Yields:
      A string, the name of the temporary directory created.
    """
    tempdir = tempfile.mkdtemp(prefix="beancount-test-tmpdir.", **kw)
    try:
        yield tempdir
    finally:
        if delete:
            shutil.rmtree(tempdir, ignore_errors=True)

beancount.utils.text_utils

Text manipulation utilities.

beancount.utils.text_utils.entitize_ampersand (filename)

Convert unescaped ampersand characters (&) to XML entities.

This is used to fix code that has been programmed by bad developers who didn't think about escaping the entities in their strings. file does not contain

Parameters:

Name Type Description Default
filename

A string, the name of the file to convert.

required

Returns:

Type Description

A self-destructing NamedTemporaryFile object that has been flushed and which you may read to obtain the fixed contents.

Source code in beancount/utils/text_utils.py
def entitize_ampersand(filename):
    """Convert unescaped ampersand characters (&) to XML entities.

    This is used to fix code that has been programmed by bad developers who
    didn't think about escaping the entities in their strings.
    file does not contain
    Args:
      filename: A string, the name of the file to convert.
    Returns:
      A self-destructing NamedTemporaryFile object that has been flushed and
      which you may read to obtain the fixed contents.
    """
    tidy_file = tempfile.NamedTemporaryFile(suffix='.xls', mode='w', delete=False)
    with open(filename) as infile:
        contents = infile.read()
    new_contents = re.sub('&([^;&]{12})', '&\\1', contents)
    tidy_file.write(new_contents)
    tidy_file.flush()
    return tidy_file

beancount.utils.text_utils.replace_number (match)

Replace a single number matched from text into X'es. 'match' is a MatchObject from a regular expressions match. (Use this with re.sub()).

Parameters:

Name Type Description Default
match

A MatchObject.

required

Returns:

Type Description

A replacement string, consisting only of X'es.

Source code in beancount/utils/text_utils.py
def replace_number(match):
    """Replace a single number matched from text into X'es.
    'match' is a MatchObject from a regular expressions match.
    (Use this with re.sub()).

    Args:
      match: A MatchObject.
    Returns:
      A replacement string, consisting only of X'es.
    """
    return re.sub('[0-9]', 'X', match.group(1)) + match.group(2)

beancount.utils.text_utils.replace_numbers (text)

Replace all numbers found within text.

Note that this is a heuristic used to filter out private numbers from web pages in incognito mode and thus may not be perfect. It should let through numbers which are part of URLs.

Parameters:

Name Type Description Default
text

An input string object.

required

Returns:

Type Description

A string, with relevant numbers hopefully replaced with X'es.

Source code in beancount/utils/text_utils.py
def replace_numbers(text):
    """Replace all numbers found within text.

    Note that this is a heuristic used to filter out private numbers from web
    pages in incognito mode and thus may not be perfect. It should let through
    numbers which are part of URLs.

    Args:
      text: An input string object.
    Returns:
      A string, with relevant numbers hopefully replaced with X'es.

    """
    return re.sub(r'\b([0-9,]+(?:\.[0-9]*)?)\b([ \t<]+[^0-9,.]|$)', replace_number, text)

beancount.utils.version

Implement common options across all programs.

beancount.utils.version.ArgumentParser (* args, ** kwargs)

Add a standard --version option to an ArgumentParser.

Parameters:

Name Type Description Default
*args

Arguments for the parser.

()
*kwargs

Keyword arguments for the parser.

{}

Returns:

Type Description

An instance of ArgumentParser, with our default options set.

Source code in beancount/utils/version.py
def ArgumentParser(*args, **kwargs):
    """Add a standard --version option to an ArgumentParser.

    Args:
      *args: Arguments for the parser.
      *kwargs: Keyword arguments for the parser.
    Returns:
      An instance of ArgumentParser, with our default options set.
    """
    parser = argparse.ArgumentParser(*args, **kwargs)

    parser.add_argument('--version', '-V', action='version',
                        version=compute_version_string(
                            beancount.__version__,
                            _parser.__vc_changeset__,
                            _parser.__vc_timestamp__))

    return parser

beancount.utils.version.compute_version_string (version, changeset, timestamp)

Compute a version string from the changeset and timestamp baked in the parser.

Parameters:

Name Type Description Default
version

A string, the version number.

required
changeset

A string, a version control string identifying the commit of the version.

required
timestamp

An integer, the UNIX epoch timestamp of the changeset.

required

Returns:

Type Description

A human-readable string for the version.

Source code in beancount/utils/version.py
def compute_version_string(version, changeset, timestamp):
    """Compute a version string from the changeset and timestamp baked in the parser.

    Args:
      version: A string, the version number.
      changeset: A string, a version control string identifying the commit of the version.
      timestamp: An integer, the UNIX epoch timestamp of the changeset.
    Returns:
      A human-readable string for the version.
    """
    # Shorten changeset.
    if changeset:
        if re.match('hg:', changeset):
            changeset = changeset[:15]
        elif re.match('git:', changeset):
            changeset = changeset[:12]

    # Convert timestamp to a date.
    date = None
    if timestamp > 0:
        date = datetime.datetime.utcfromtimestamp(timestamp).date()

    version = 'Beancount {}'.format(version)
    if changeset or date:
        version = '{} ({})'.format(
            version, '; '.join(map(str, filter(None, [changeset, date]))))

    return version