Source code for Goulib.container

#!/usr/bin/env python
# coding: utf8
advanced containers : Record (struct), and INFINITE Sequence

from __future__ import division, print_function

__author__ = "Philippe Guglielmetti"
__copyright__ = "Copyright 2015, Philippe Guglielmetti"
__credits__ = []
__license__ = "LGPL"

import six
from six.moves import zip, map, filter

from bisect import bisect_left, bisect_right
from collections import OrderedDict

import operator

from itertools import count, tee, islice
from Goulib import itertools2, decorators, tests

[docs]class Record(OrderedDict): """mimics a Pascal record or a C struct""" #
[docs] def __init__(self, *args, **kwargs): super(Record, self).__init__(*args, **kwargs) self._initialized = True
[docs] def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name)
[docs] def __setattr__(self, name, value): if not name: return if '_initialized' in self.__dict__: super(Record, self).__setitem__(name, value) else: super(Record, self).__setattr__(name, value)
[docs] def __str__(self): res=['%s:%s'%(k,self[k]) for k in self] return '{{%s}}'%(','.join(res))
[docs]class Sequence(object): """combines a generator and a read-only list used for INFINITE numeric (integer) sequences """
[docs] def __init__(self,iterf=None,itemf=None,containf=None,desc='', timeout=0): """ :param iterf: optional iterator, or a function returning an iterator :param itemf: optional function(i) returning the i-th element :param containf: optional function(n) return bool True if n belongs to Sequence :param desc: string description """ #by default if isinstance(iterf,six.integer_types): self.offset=iterf self.iterf=None else: try: iterf=iterf() except TypeError as e: pass self.offset=0 self.iterf=iterf self.itemf=itemf if itemf and not desc: desc=itemf.__doc__ self.containf=containf self.desc=desc self.timeout=timeout self._repr=''
[docs] def __repr__(self): if not self._repr: # cache for speed s=tests.pprint(self,[0,1,2,3,4,5,6,7,8,9],0.001) # must be very quick for debugger self._repr='%s = (%s)'%(,s) return self._repr
[docs] def save(self, filename, comment=None, n=1000, maxtime=10): with open(filename,'wt') as f: from datetime import date comment = comment or "%s %s"%(self.desc, print('#'+comment, file=f) for i,v in enumerate(self): if i>n : break print(i+self.offset,v,file=f)
[docs] def __iter__(self): """reset the generator :return: a tee-ed copy of iterf, optionally timeout decorated """ if self.iterf: self.iterf, res=tee(self.iterf) if self.timeout: res=decorators.itimeout(res,self.timeout) else: it=count(self.offset) if self.timeout: it=decorators.itimeout(it,self.timeout) if self.itemf: res=map(lambda i:self[i],it) else: res=filter(lambda n:n in self,it) # uses containf return res
[docs] def __getitem__(self, i): if not isinstance(i,slice): if self.itemf : return self.itemf(i) else: return itertools2.index(i,self) else: return islice(self(),i.start,i.stop,i.step)
[docs] def index(self,v): #assume sequence is growing for i,n in enumerate(self): if v==n: return i if n>v: return -1
[docs] def __contains__(self,n): if self.containf: return self.containf(n) else: return self.index(n)>=0
[docs] def __add__(self,other): if type(other) is int: return self.apply( lambda n:n+other, containf=lambda n:n-other in self, desc='%s+%d'%(,other) ) def _(): for (a,b) in zip(self,other): yield a+b return Sequence(_, lambda i:self[i]+other[i])
[docs] def __sub__(self,other): if type(other) is int: return self+(-other) def _(): for (a,b) in zip(self,other): yield a-b return Sequence(_, lambda i:self[i].other[i])
[docs] def __mul__(self,other): if type(other) is int: return self.apply( lambda n:n*other, containf=lambda n:other/n in self, desc='%d*%s'%(other, ) def _(): for (a,b) in zip(self,other): yield a*b return Sequence(_, lambda i:self[i]*other[i])
[docs] def __div__(self,other): if type(other) is int: return self.apply( lambda n:n//other, containf=lambda n:other*n in self, desc='%s//%d'%(,other) ) def _(): for (a,b) in zip(self,other): yield a//b return Sequence(_, lambda i:self[i]//other[i])
[docs] def __truediv__(self,other): if type(other) is int: return self.apply( lambda n:n/other, containf=lambda n:other*n in self, desc='%s/%d'%(,other) ) def _(): for (a,b) in zip(self,other): yield a/b return Sequence(_, lambda i:self[i]/other[i])
[docs] def __or__(self,other): """ :return: Sequence with items from both (sorted) operand Sequences """ return Sequence( itertools2.unique(itertools2.merge(self,other)), None, lambda x:x in self or x in other )
[docs] def __and__(self,other): """ :return: Sequence with items in both operands """ if other.containf: return self.filter(other.containf) if self.containf: return other.filter(self.containf) raise(NotImplementedError)
[docs] def __mod__(self,other): """ :return: Sequence with items from left operand not in right """ return Sequence( itertools2.diff(self.__iter__(),other.__iter__()), None, lambda x:x in self and x not in other )
[docs] def apply(self,f,containf=None,desc=''): return Sequence( map(f,self), lambda i:f(self[i]), containf, desc )
[docs] def filter(self,f,desc=''): return Sequence( filter(f,self), None, lambda n:f(n) and n in self, desc )
[docs] def __le__(self, other): return Sequence(,other,operator.le) )
[docs] def __gt__(self, other): return Sequence(,other, )
[docs] def accumulate(self,op=operator.add,skip_first=False): return Sequence(itertools2.accumulate(self,op,skip_first))
[docs] def pairwise(self,op,skip_first=False): return Sequence(itertools2.pairwise(self,op))
[docs] def sort(self,key=None,buffer=100): return Sequence(itertools2.sorted_iterable(self, key, buffer))
[docs] def unique(self,buffer=100): """ :param buffer: int number of last elements found. if two identical elements are separated by more than this number of elements in self, they might be generated twice in the resulting Sequence :return: Sequence made of unique elements of this one """ return Sequence(itertools2.unique(self,None,buffer))
[docs] def product(self, other, op=sum, buffer=100): """cartesian product""" it=itertools2.product(self, other) return Sequence(it).apply(op).sort(buffer=buffer)