Source code for Goulib.container

"""
advanced containers : Record (struct), and INFINITE Sequence
"""
__author__ = "Philippe Guglielmetti"
__copyright__ = "Copyright 2015, Philippe Guglielmetti"
__credits__ = []
__license__ = "LGPL"

from bisect import bisect_left, bisect_right
from collections import OrderedDict

import operator

from itertools import count, tee, islice, chain
from Goulib import itertools2, decorators, tests


[docs]class Record(OrderedDict): """mimics a Pascal record or a C struct""" # https://stackoverflow.com/a/5491708/1395973
[docs] def __init__(self, *args, **kwargs): super(Record, self).__init__(*args, **kwargs) self._initialized = True
[docs] def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name)
[docs] def __setattr__(self, name, value): if not name: return if '_initialized' in self.__dict__: super(Record, self).__setitem__(name, value) else: super(Record, self).__setattr__(name, value)
[docs] def __str__(self): res = ['%s:%s' % (k, self[k]) for k in self] return '{{%s}}' % (','.join(res))
[docs]class Sequence(object): """combines a generator and a read-only list used for INFINITE numeric (integer) sequences """
[docs] def __init__(self, iterf=None, itemf=None, containf=None, desc='', timeout=0): """ :param iterf: optional iterator, or a function returning an iterator :param itemf: optional function(i) returning the i-th element :param containf: optional function(n) return bool True if n belongs to Sequence :param desc: string description """ self.name = self.__class__.__name__ # by default if isinstance(iterf, int): self.offset = iterf self.iterf = None else: try: iterf = iterf() except TypeError as e: pass self.offset = 0 self.iterf = iterf self.itemf = itemf if itemf and not desc: desc = itemf.__doc__ self.containf = containf self.desc = desc self.timeout = timeout self._repr = ''
[docs] def __repr__(self): if not self._repr: # cache for speed s = tests.pprint(self, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 0.001) # must be very quick for debugger self._repr = '%s = (%s)' % (self.name, s) return self._repr
[docs] def save(self, filename, comment=None, n=1000, maxtime=10): with open(filename, 'wt') as f: from datetime import date comment = comment or "%s %s" % (self.desc, date.today()) print('#' + comment, file=f) for i, v in enumerate(self): if i > n : break print(i + self.offset, v, file=f)
[docs] def __iter__(self): """reset the generator :return: a tee-ed copy of iterf, optionally timeout decorated """ if self.iterf: self.iterf, res = tee(self.iterf) if self.timeout: res = decorators.itimeout(res, self.timeout) else: it = count(self.offset) if self.timeout: it = decorators.itimeout(it, self.timeout) if self.itemf: res = map(lambda i:self[i], it) else: res = filter(lambda n:n in self, it) # uses containf return res
[docs] def __getitem__(self, i): if not isinstance(i, slice): if self.itemf : return self.itemf(i) else: return itertools2.index(i, self) else: return islice(self(), i.start, i.stop, i.step)
[docs] def index(self, v): # assume sequence is growing for i, n in enumerate(self): if v == n: return i if n > v: return -1
[docs] def __contains__(self, n): if self.containf: return self.containf(n) else: return self.index(n) >= 0
[docs] def __add__(self, other): if type(other) is int: return self.apply( lambda n:n + other, containf=lambda n:n - other in self, desc='%s+%d' % (self.name, other) ) def _(): for (a, b) in zip(self, other): yield a + b return Sequence(_, lambda i:self[i] + other[i])
[docs] def __sub__(self, other): if type(other) is int: return self + (-other) def _(): for (a, b) in zip(self, other): yield a - b return Sequence(_, lambda i:self[i].other[i])
[docs] def __mul__(self, other): if type(other) is int: return self.apply( lambda n:n * other, containf=lambda n:other / n in self, desc='%d*%s' % (other, self.name) ) def _(): for (a, b) in zip(self, other): yield a * b return Sequence(_, lambda i:self[i] * other[i])
[docs] def __div__(self, other): if type(other) is int: return self.apply( lambda n:n // other, containf=lambda n:other * n in self, desc='%s//%d' % (self.name, other) ) def _(): for (a, b) in zip(self, other): yield a // b return Sequence(_, lambda i:self[i] // other[i])
[docs] def __truediv__(self, other): if type(other) is int: return self.apply( lambda n:n / other, containf=lambda n:other * n in self, desc='%s/%d' % (self.name, other) ) def _(): from fractions import Fraction for (a, b) in zip(self, other): yield Fraction(a, b) return Sequence(_, lambda i:self[i] / other[i])
[docs] def __or__(self, other): """ :return: Sequence with items from both (sorted) operand Sequences """ return Sequence( itertools2.unique(itertools2.merge(self, other)), None, lambda x:x in self or x in other )
[docs] def __and__(self, other): """ :return: Sequence with items in both operands """ if other.containf: return self.filter(other.containf) if self.containf: return other.filter(self.containf) raise(NotImplementedError)
[docs] def __mod__(self, other): """ :return: Sequence with items from left operand not in right """ return Sequence( itertools2.diff(self.__iter__(), other.__iter__()), None, lambda x:x in self and x not in other )
[docs] def apply(self, f, containf=None, desc=''): ''' function composition''' return Sequence( map(f, self), lambda i:f(self[i]), containf, desc )
[docs] def __call__(self, other): return other.apply(self.itemf)
[docs] def filter(self, f, desc=''): return Sequence( filter(f, self), None, lambda n:f(n) and n in self, desc )
[docs] def __le__(self, other): return Sequence( itertools2.select(self, other, operator.le) )
[docs] def __gt__(self, other): return Sequence( itertools2.select(self, other, operator.gt) )
[docs] def accumulate(self, op=operator.add, init=[]): return Sequence(chain(init, itertools2.accumulate(self, op, False)))
[docs] def pairwise(self, op, skip_first=False): return Sequence(itertools2.pairwise(self, op))
[docs] def sort(self, key=None, buffer=100): return Sequence(itertools2.sorted_iterable(self, key, buffer))
[docs] def unique(self, buffer=100): """ :param buffer: int number of last elements found. if two identical elements are separated by more than this number of elements in self, they might be generated twice in the resulting Sequence :return: Sequence made of unique elements of this one """ return Sequence(itertools2.unique(self, None, buffer))
[docs] def product(self, other, op=sum, buffer=100): """cartesian product""" it = itertools2.product(self, other) return Sequence(it).apply(op).sort(buffer=buffer)