123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419 |
- # util/langhelpers.py
- # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- """Routines to help with the creation, loading and introspection of
- modules, classes, hierarchies, attributes, functions, and methods.
- """
- import itertools
- import inspect
- import operator
- import re
- import sys
- import types
- import warnings
- from functools import update_wrapper
- from .. import exc
- import hashlib
- from . import compat
- from . import _collections
- def md5_hex(x):
- if compat.py3k:
- x = x.encode('utf-8')
- m = hashlib.md5()
- m.update(x)
- return m.hexdigest()
- class safe_reraise(object):
- """Reraise an exception after invoking some
- handler code.
- Stores the existing exception info before
- invoking so that it is maintained across a potential
- coroutine context switch.
- e.g.::
- try:
- sess.commit()
- except:
- with safe_reraise():
- sess.rollback()
- """
- __slots__ = ('warn_only', '_exc_info')
- def __init__(self, warn_only=False):
- self.warn_only = warn_only
- def __enter__(self):
- self._exc_info = sys.exc_info()
- def __exit__(self, type_, value, traceback):
- # see #2703 for notes
- if type_ is None:
- exc_type, exc_value, exc_tb = self._exc_info
- self._exc_info = None # remove potential circular references
- if not self.warn_only:
- compat.reraise(exc_type, exc_value, exc_tb)
- else:
- if not compat.py3k and self._exc_info and self._exc_info[1]:
- # emulate Py3K's behavior of telling us when an exception
- # occurs in an exception handler.
- warn(
- "An exception has occurred during handling of a "
- "previous exception. The previous exception "
- "is:\n %s %s\n" % (self._exc_info[0], self._exc_info[1]))
- self._exc_info = None # remove potential circular references
- compat.reraise(type_, value, traceback)
- def decode_slice(slc):
- """decode a slice object as sent to __getitem__.
- takes into account the 2.5 __index__() method, basically.
- """
- ret = []
- for x in slc.start, slc.stop, slc.step:
- if hasattr(x, '__index__'):
- x = x.__index__()
- ret.append(x)
- return tuple(ret)
- def _unique_symbols(used, *bases):
- used = set(used)
- for base in bases:
- pool = itertools.chain((base,),
- compat.itertools_imap(lambda i: base + str(i),
- range(1000)))
- for sym in pool:
- if sym not in used:
- used.add(sym)
- yield sym
- break
- else:
- raise NameError("exhausted namespace for symbol base %s" % base)
- def map_bits(fn, n):
- """Call the given function given each nonzero bit from n."""
- while n:
- b = n & (~n + 1)
- yield fn(b)
- n ^= b
- def decorator(target):
- """A signature-matching decorator factory."""
- def decorate(fn):
- if not inspect.isfunction(fn):
- raise Exception("not a decoratable function")
- spec = compat.inspect_getfullargspec(fn)
- names = tuple(spec[0]) + spec[1:3] + (fn.__name__,)
- targ_name, fn_name = _unique_symbols(names, 'target', 'fn')
- metadata = dict(target=targ_name, fn=fn_name)
- metadata.update(format_argspec_plus(spec, grouped=False))
- metadata['name'] = fn.__name__
- code = """\
- def %(name)s(%(args)s):
- return %(target)s(%(fn)s, %(apply_kw)s)
- """ % metadata
- decorated = _exec_code_in_env(code,
- {targ_name: target, fn_name: fn},
- fn.__name__)
- decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
- decorated.__wrapped__ = fn
- return update_wrapper(decorated, fn)
- return update_wrapper(decorate, target)
- def _exec_code_in_env(code, env, fn_name):
- exec(code, env)
- return env[fn_name]
- def public_factory(target, location):
- """Produce a wrapping function for the given cls or classmethod.
- Rationale here is so that the __init__ method of the
- class can serve as documentation for the function.
- """
- if isinstance(target, type):
- fn = target.__init__
- callable_ = target
- doc = "Construct a new :class:`.%s` object. \n\n"\
- "This constructor is mirrored as a public API function; "\
- "see :func:`~%s` "\
- "for a full usage and argument description." % (
- target.__name__, location, )
- else:
- fn = callable_ = target
- doc = "This function is mirrored; see :func:`~%s` "\
- "for a description of arguments." % location
- location_name = location.split(".")[-1]
- spec = compat.inspect_getfullargspec(fn)
- del spec[0][0]
- metadata = format_argspec_plus(spec, grouped=False)
- metadata['name'] = location_name
- code = """\
- def %(name)s(%(args)s):
- return cls(%(apply_kw)s)
- """ % metadata
- env = {'cls': callable_, 'symbol': symbol}
- exec(code, env)
- decorated = env[location_name]
- decorated.__doc__ = fn.__doc__
- decorated.__module__ = "sqlalchemy" + location.rsplit(".", 1)[0]
- if compat.py2k or hasattr(fn, '__func__'):
- fn.__func__.__doc__ = doc
- else:
- fn.__doc__ = doc
- return decorated
- class PluginLoader(object):
- def __init__(self, group, auto_fn=None):
- self.group = group
- self.impls = {}
- self.auto_fn = auto_fn
- def load(self, name):
- if name in self.impls:
- return self.impls[name]()
- if self.auto_fn:
- loader = self.auto_fn(name)
- if loader:
- self.impls[name] = loader
- return loader()
- try:
- import pkg_resources
- except ImportError:
- pass
- else:
- for impl in pkg_resources.iter_entry_points(
- self.group, name):
- self.impls[name] = impl.load
- return impl.load()
- raise exc.NoSuchModuleError(
- "Can't load plugin: %s:%s" %
- (self.group, name))
- def register(self, name, modulepath, objname):
- def load():
- mod = compat.import_(modulepath)
- for token in modulepath.split(".")[1:]:
- mod = getattr(mod, token)
- return getattr(mod, objname)
- self.impls[name] = load
- def get_cls_kwargs(cls, _set=None):
- r"""Return the full set of inherited kwargs for the given `cls`.
- Probes a class's __init__ method, collecting all named arguments. If the
- __init__ defines a \**kwargs catch-all, then the constructor is presumed
- to pass along unrecognized keywords to its base classes, and the
- collection process is repeated recursively on each of the bases.
- Uses a subset of inspect.getargspec() to cut down on method overhead.
- No anonymous tuple arguments please !
- """
- toplevel = _set is None
- if toplevel:
- _set = set()
- ctr = cls.__dict__.get('__init__', False)
- has_init = ctr and isinstance(ctr, types.FunctionType) and \
- isinstance(ctr.__code__, types.CodeType)
- if has_init:
- names, has_kw = inspect_func_args(ctr)
- _set.update(names)
- if not has_kw and not toplevel:
- return None
- if not has_init or has_kw:
- for c in cls.__bases__:
- if get_cls_kwargs(c, _set) is None:
- break
- _set.discard('self')
- return _set
- try:
- # TODO: who doesn't have this constant?
- from inspect import CO_VARKEYWORDS
- def inspect_func_args(fn):
- co = fn.__code__
- nargs = co.co_argcount
- names = co.co_varnames
- args = list(names[:nargs])
- has_kw = bool(co.co_flags & CO_VARKEYWORDS)
- return args, has_kw
- except ImportError:
- def inspect_func_args(fn):
- names, _, has_kw, _ = inspect.getargspec(fn)
- return names, bool(has_kw)
- def get_func_kwargs(func):
- """Return the set of legal kwargs for the given `func`.
- Uses getargspec so is safe to call for methods, functions,
- etc.
- """
- return compat.inspect_getargspec(func)[0]
- def get_callable_argspec(fn, no_self=False, _is_init=False):
- """Return the argument signature for any callable.
- All pure-Python callables are accepted, including
- functions, methods, classes, objects with __call__;
- builtins and other edge cases like functools.partial() objects
- raise a TypeError.
- """
- if inspect.isbuiltin(fn):
- raise TypeError("Can't inspect builtin: %s" % fn)
- elif inspect.isfunction(fn):
- if _is_init and no_self:
- spec = compat.inspect_getargspec(fn)
- return compat.ArgSpec(spec.args[1:], spec.varargs,
- spec.keywords, spec.defaults)
- else:
- return compat.inspect_getargspec(fn)
- elif inspect.ismethod(fn):
- if no_self and (_is_init or fn.__self__):
- spec = compat.inspect_getargspec(fn.__func__)
- return compat.ArgSpec(spec.args[1:], spec.varargs,
- spec.keywords, spec.defaults)
- else:
- return compat.inspect_getargspec(fn.__func__)
- elif inspect.isclass(fn):
- return get_callable_argspec(
- fn.__init__, no_self=no_self, _is_init=True)
- elif hasattr(fn, '__func__'):
- return compat.inspect_getargspec(fn.__func__)
- elif hasattr(fn, '__call__'):
- if inspect.ismethod(fn.__call__):
- return get_callable_argspec(fn.__call__, no_self=no_self)
- else:
- raise TypeError("Can't inspect callable: %s" % fn)
- else:
- raise TypeError("Can't inspect callable: %s" % fn)
- def format_argspec_plus(fn, grouped=True):
- """Returns a dictionary of formatted, introspected function arguments.
- A enhanced variant of inspect.formatargspec to support code generation.
- fn
- An inspectable callable or tuple of inspect getargspec() results.
- grouped
- Defaults to True; include (parens, around, argument) lists
- Returns:
- args
- Full inspect.formatargspec for fn
- self_arg
- The name of the first positional argument, varargs[0], or None
- if the function defines no positional arguments.
- apply_pos
- args, re-written in calling rather than receiving syntax. Arguments are
- passed positionally.
- apply_kw
- Like apply_pos, except keyword-ish args are passed as keywords.
- Example::
- >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
- {'args': '(self, a, b, c=3, **d)',
- 'self_arg': 'self',
- 'apply_kw': '(self, a, b, c=c, **d)',
- 'apply_pos': '(self, a, b, c, **d)'}
- """
- if compat.callable(fn):
- spec = compat.inspect_getfullargspec(fn)
- else:
- # we accept an existing argspec...
- spec = fn
- args = inspect.formatargspec(*spec)
- if spec[0]:
- self_arg = spec[0][0]
- elif spec[1]:
- self_arg = '%s[0]' % spec[1]
- else:
- self_arg = None
- if compat.py3k:
- apply_pos = inspect.formatargspec(spec[0], spec[1],
- spec[2], None, spec[4])
- num_defaults = 0
- if spec[3]:
- num_defaults += len(spec[3])
- if spec[4]:
- num_defaults += len(spec[4])
- name_args = spec[0] + spec[4]
- else:
- apply_pos = inspect.formatargspec(spec[0], spec[1], spec[2])
- num_defaults = 0
- if spec[3]:
- num_defaults += len(spec[3])
- name_args = spec[0]
- if num_defaults:
- defaulted_vals = name_args[0 - num_defaults:]
- else:
- defaulted_vals = ()
- apply_kw = inspect.formatargspec(name_args, spec[1], spec[2],
- defaulted_vals,
- formatvalue=lambda x: '=' + x)
- if grouped:
- return dict(args=args, self_arg=self_arg,
- apply_pos=apply_pos, apply_kw=apply_kw)
- else:
- return dict(args=args[1:-1], self_arg=self_arg,
- apply_pos=apply_pos[1:-1], apply_kw=apply_kw[1:-1])
- def format_argspec_init(method, grouped=True):
- """format_argspec_plus with considerations for typical __init__ methods
- Wraps format_argspec_plus with error handling strategies for typical
- __init__ cases::
- object.__init__ -> (self)
- other unreflectable (usually C) -> (self, *args, **kwargs)
- """
- if method is object.__init__:
- args = grouped and '(self)' or 'self'
- else:
- try:
- return format_argspec_plus(method, grouped=grouped)
- except TypeError:
- args = (grouped and '(self, *args, **kwargs)'
- or 'self, *args, **kwargs')
- return dict(self_arg='self', args=args, apply_pos=args, apply_kw=args)
- def getargspec_init(method):
- """inspect.getargspec with considerations for typical __init__ methods
- Wraps inspect.getargspec with error handling for typical __init__ cases::
- object.__init__ -> (self)
- other unreflectable (usually C) -> (self, *args, **kwargs)
- """
- try:
- return compat.inspect_getargspec(method)
- except TypeError:
- if method is object.__init__:
- return (['self'], None, None, None)
- else:
- return (['self'], 'args', 'kwargs', None)
- def unbound_method_to_callable(func_or_cls):
- """Adjust the incoming callable such that a 'self' argument is not
- required.
- """
- if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
- return func_or_cls.__func__
- else:
- return func_or_cls
- def generic_repr(obj, additional_kw=(), to_inspect=None, omit_kwarg=()):
- """Produce a __repr__() based on direct association of the __init__()
- specification vs. same-named attributes present.
- """
- if to_inspect is None:
- to_inspect = [obj]
- else:
- to_inspect = _collections.to_list(to_inspect)
- missing = object()
- pos_args = []
- kw_args = _collections.OrderedDict()
- vargs = None
- for i, insp in enumerate(to_inspect):
- try:
- (_args, _vargs, vkw, defaults) = \
- compat.inspect_getargspec(insp.__init__)
- except TypeError:
- continue
- else:
- default_len = defaults and len(defaults) or 0
- if i == 0:
- if _vargs:
- vargs = _vargs
- if default_len:
- pos_args.extend(_args[1:-default_len])
- else:
- pos_args.extend(_args[1:])
- else:
- kw_args.update([
- (arg, missing) for arg in _args[1:-default_len]
- ])
- if default_len:
- kw_args.update([
- (arg, default)
- for arg, default
- in zip(_args[-default_len:], defaults)
- ])
- output = []
- output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
- if vargs is not None and hasattr(obj, vargs):
- output.extend([repr(val) for val in getattr(obj, vargs)])
- for arg, defval in kw_args.items():
- if arg in omit_kwarg:
- continue
- try:
- val = getattr(obj, arg, missing)
- if val is not missing and val != defval:
- output.append('%s=%r' % (arg, val))
- except Exception:
- pass
- if additional_kw:
- for arg, defval in additional_kw:
- try:
- val = getattr(obj, arg, missing)
- if val is not missing and val != defval:
- output.append('%s=%r' % (arg, val))
- except Exception:
- pass
- return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
- class portable_instancemethod(object):
- """Turn an instancemethod into a (parent, name) pair
- to produce a serializable callable.
- """
- __slots__ = 'target', 'name', 'kwargs', '__weakref__'
- def __getstate__(self):
- return {'target': self.target, 'name': self.name,
- 'kwargs': self.kwargs}
- def __setstate__(self, state):
- self.target = state['target']
- self.name = state['name']
- self.kwargs = state.get('kwargs', ())
- def __init__(self, meth, kwargs=()):
- self.target = meth.__self__
- self.name = meth.__name__
- self.kwargs = kwargs
- def __call__(self, *arg, **kw):
- kw.update(self.kwargs)
- return getattr(self.target, self.name)(*arg, **kw)
- def class_hierarchy(cls):
- """Return an unordered sequence of all classes related to cls.
- Traverses diamond hierarchies.
- Fibs slightly: subclasses of builtin types are not returned. Thus
- class_hierarchy(class A(object)) returns (A, object), not A plus every
- class systemwide that derives from object.
- Old-style classes are discarded and hierarchies rooted on them
- will not be descended.
- """
- if compat.py2k:
- if isinstance(cls, types.ClassType):
- return list()
- hier = set([cls])
- process = list(cls.__mro__)
- while process:
- c = process.pop()
- if compat.py2k:
- if isinstance(c, types.ClassType):
- continue
- bases = (_ for _ in c.__bases__
- if _ not in hier and not isinstance(_, types.ClassType))
- else:
- bases = (_ for _ in c.__bases__ if _ not in hier)
- for b in bases:
- process.append(b)
- hier.add(b)
- if compat.py3k:
- if c.__module__ == 'builtins' or not hasattr(c, '__subclasses__'):
- continue
- else:
- if c.__module__ == '__builtin__' or not hasattr(
- c, '__subclasses__'):
- continue
- for s in [_ for _ in c.__subclasses__() if _ not in hier]:
- process.append(s)
- hier.add(s)
- return list(hier)
- def iterate_attributes(cls):
- """iterate all the keys and attributes associated
- with a class, without using getattr().
- Does not use getattr() so that class-sensitive
- descriptors (i.e. property.__get__()) are not called.
- """
- keys = dir(cls)
- for key in keys:
- for c in cls.__mro__:
- if key in c.__dict__:
- yield (key, c.__dict__[key])
- break
- def monkeypatch_proxied_specials(into_cls, from_cls, skip=None, only=None,
- name='self.proxy', from_instance=None):
- """Automates delegation of __specials__ for a proxying type."""
- if only:
- dunders = only
- else:
- if skip is None:
- skip = ('__slots__', '__del__', '__getattribute__',
- '__metaclass__', '__getstate__', '__setstate__')
- dunders = [m for m in dir(from_cls)
- if (m.startswith('__') and m.endswith('__') and
- not hasattr(into_cls, m) and m not in skip)]
- for method in dunders:
- try:
- fn = getattr(from_cls, method)
- if not hasattr(fn, '__call__'):
- continue
- fn = getattr(fn, 'im_func', fn)
- except AttributeError:
- continue
- try:
- spec = compat.inspect_getargspec(fn)
- fn_args = inspect.formatargspec(spec[0])
- d_args = inspect.formatargspec(spec[0][1:])
- except TypeError:
- fn_args = '(self, *args, **kw)'
- d_args = '(*args, **kw)'
- py = ("def %(method)s%(fn_args)s: "
- "return %(name)s.%(method)s%(d_args)s" % locals())
- env = from_instance is not None and {name: from_instance} or {}
- compat.exec_(py, env)
- try:
- env[method].__defaults__ = fn.__defaults__
- except AttributeError:
- pass
- setattr(into_cls, method, env[method])
- def methods_equivalent(meth1, meth2):
- """Return True if the two methods are the same implementation."""
- return getattr(meth1, '__func__', meth1) is getattr(
- meth2, '__func__', meth2)
- def as_interface(obj, cls=None, methods=None, required=None):
- """Ensure basic interface compliance for an instance or dict of callables.
- Checks that ``obj`` implements public methods of ``cls`` or has members
- listed in ``methods``. If ``required`` is not supplied, implementing at
- least one interface method is sufficient. Methods present on ``obj`` that
- are not in the interface are ignored.
- If ``obj`` is a dict and ``dict`` does not meet the interface
- requirements, the keys of the dictionary are inspected. Keys present in
- ``obj`` that are not in the interface will raise TypeErrors.
- Raises TypeError if ``obj`` does not meet the interface criteria.
- In all passing cases, an object with callable members is returned. In the
- simple case, ``obj`` is returned as-is; if dict processing kicks in then
- an anonymous class is returned.
- obj
- A type, instance, or dictionary of callables.
- cls
- Optional, a type. All public methods of cls are considered the
- interface. An ``obj`` instance of cls will always pass, ignoring
- ``required``..
- methods
- Optional, a sequence of method names to consider as the interface.
- required
- Optional, a sequence of mandatory implementations. If omitted, an
- ``obj`` that provides at least one interface method is considered
- sufficient. As a convenience, required may be a type, in which case
- all public methods of the type are required.
- """
- if not cls and not methods:
- raise TypeError('a class or collection of method names are required')
- if isinstance(cls, type) and isinstance(obj, cls):
- return obj
- interface = set(methods or [m for m in dir(cls) if not m.startswith('_')])
- implemented = set(dir(obj))
- complies = operator.ge
- if isinstance(required, type):
- required = interface
- elif not required:
- required = set()
- complies = operator.gt
- else:
- required = set(required)
- if complies(implemented.intersection(interface), required):
- return obj
- # No dict duck typing here.
- if not isinstance(obj, dict):
- qualifier = complies is operator.gt and 'any of' or 'all of'
- raise TypeError("%r does not implement %s: %s" % (
- obj, qualifier, ', '.join(interface)))
- class AnonymousInterface(object):
- """A callable-holding shell."""
- if cls:
- AnonymousInterface.__name__ = 'Anonymous' + cls.__name__
- found = set()
- for method, impl in dictlike_iteritems(obj):
- if method not in interface:
- raise TypeError("%r: unknown in this interface" % method)
- if not compat.callable(impl):
- raise TypeError("%r=%r is not callable" % (method, impl))
- setattr(AnonymousInterface, method, staticmethod(impl))
- found.add(method)
- if complies(found, required):
- return AnonymousInterface
- raise TypeError("dictionary does not contain required keys %s" %
- ', '.join(required - found))
- class memoized_property(object):
- """A read-only @property that is only evaluated once."""
- def __init__(self, fget, doc=None):
- self.fget = fget
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
- def __get__(self, obj, cls):
- if obj is None:
- return self
- obj.__dict__[self.__name__] = result = self.fget(obj)
- return result
- def _reset(self, obj):
- memoized_property.reset(obj, self.__name__)
- @classmethod
- def reset(cls, obj, name):
- obj.__dict__.pop(name, None)
- def memoized_instancemethod(fn):
- """Decorate a method memoize its return value.
- Best applied to no-arg methods: memoization is not sensitive to
- argument values, and will always return the same value even when
- called with different arguments.
- """
- def oneshot(self, *args, **kw):
- result = fn(self, *args, **kw)
- memo = lambda *a, **kw: result
- memo.__name__ = fn.__name__
- memo.__doc__ = fn.__doc__
- self.__dict__[fn.__name__] = memo
- return result
- return update_wrapper(oneshot, fn)
- class group_expirable_memoized_property(object):
- """A family of @memoized_properties that can be expired in tandem."""
- def __init__(self, attributes=()):
- self.attributes = []
- if attributes:
- self.attributes.extend(attributes)
- def expire_instance(self, instance):
- """Expire all memoized properties for *instance*."""
- stash = instance.__dict__
- for attribute in self.attributes:
- stash.pop(attribute, None)
- def __call__(self, fn):
- self.attributes.append(fn.__name__)
- return memoized_property(fn)
- def method(self, fn):
- self.attributes.append(fn.__name__)
- return memoized_instancemethod(fn)
- class MemoizedSlots(object):
- """Apply memoized items to an object using a __getattr__ scheme.
- This allows the functionality of memoized_property and
- memoized_instancemethod to be available to a class using __slots__.
- """
- __slots__ = ()
- def _fallback_getattr(self, key):
- raise AttributeError(key)
- def __getattr__(self, key):
- if key.startswith('_memoized'):
- raise AttributeError(key)
- elif hasattr(self, '_memoized_attr_%s' % key):
- value = getattr(self, '_memoized_attr_%s' % key)()
- setattr(self, key, value)
- return value
- elif hasattr(self, '_memoized_method_%s' % key):
- fn = getattr(self, '_memoized_method_%s' % key)
- def oneshot(*args, **kw):
- result = fn(*args, **kw)
- memo = lambda *a, **kw: result
- memo.__name__ = fn.__name__
- memo.__doc__ = fn.__doc__
- setattr(self, key, memo)
- return result
- oneshot.__doc__ = fn.__doc__
- return oneshot
- else:
- return self._fallback_getattr(key)
- def dependency_for(modulename):
- def decorate(obj):
- # TODO: would be nice to improve on this import silliness,
- # unfortunately importlib doesn't work that great either
- tokens = modulename.split(".")
- mod = compat.import_(
- ".".join(tokens[0:-1]), globals(), locals(), tokens[-1])
- mod = getattr(mod, tokens[-1])
- setattr(mod, obj.__name__, obj)
- return obj
- return decorate
- class dependencies(object):
- """Apply imported dependencies as arguments to a function.
- E.g.::
- @util.dependencies(
- "sqlalchemy.sql.widget",
- "sqlalchemy.engine.default"
- );
- def some_func(self, widget, default, arg1, arg2, **kw):
- # ...
- Rationale is so that the impact of a dependency cycle can be
- associated directly with the few functions that cause the cycle,
- and not pollute the module-level namespace.
- """
- def __init__(self, *deps):
- self.import_deps = []
- for dep in deps:
- tokens = dep.split(".")
- self.import_deps.append(
- dependencies._importlater(
- ".".join(tokens[0:-1]),
- tokens[-1]
- )
- )
- def __call__(self, fn):
- import_deps = self.import_deps
- spec = compat.inspect_getfullargspec(fn)
- spec_zero = list(spec[0])
- hasself = spec_zero[0] in ('self', 'cls')
- for i in range(len(import_deps)):
- spec[0][i + (1 if hasself else 0)] = "import_deps[%r]" % i
- inner_spec = format_argspec_plus(spec, grouped=False)
- for impname in import_deps:
- del spec_zero[1 if hasself else 0]
- spec[0][:] = spec_zero
- outer_spec = format_argspec_plus(spec, grouped=False)
- code = 'lambda %(args)s: fn(%(apply_kw)s)' % {
- "args": outer_spec['args'],
- "apply_kw": inner_spec['apply_kw']
- }
- decorated = eval(code, locals())
- decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
- return update_wrapper(decorated, fn)
- @classmethod
- def resolve_all(cls, path):
- for m in list(dependencies._unresolved):
- if m._full_path.startswith(path):
- m._resolve()
- _unresolved = set()
- _by_key = {}
- class _importlater(object):
- _unresolved = set()
- _by_key = {}
- def __new__(cls, path, addtl):
- key = path + "." + addtl
- if key in dependencies._by_key:
- return dependencies._by_key[key]
- else:
- dependencies._by_key[key] = imp = object.__new__(cls)
- return imp
- def __init__(self, path, addtl):
- self._il_path = path
- self._il_addtl = addtl
- dependencies._unresolved.add(self)
- @property
- def _full_path(self):
- return self._il_path + "." + self._il_addtl
- @memoized_property
- def module(self):
- if self in dependencies._unresolved:
- raise ImportError(
- "importlater.resolve_all() hasn't "
- "been called (this is %s %s)"
- % (self._il_path, self._il_addtl))
- return getattr(self._initial_import, self._il_addtl)
- def _resolve(self):
- dependencies._unresolved.discard(self)
- self._initial_import = compat.import_(
- self._il_path, globals(), locals(),
- [self._il_addtl])
- def __getattr__(self, key):
- if key == 'module':
- raise ImportError("Could not resolve module %s"
- % self._full_path)
- try:
- attr = getattr(self.module, key)
- except AttributeError:
- raise AttributeError(
- "Module %s has no attribute '%s'" %
- (self._full_path, key)
- )
- self.__dict__[key] = attr
- return attr
- # from paste.deploy.converters
- def asbool(obj):
- if isinstance(obj, compat.string_types):
- obj = obj.strip().lower()
- if obj in ['true', 'yes', 'on', 'y', 't', '1']:
- return True
- elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
- return False
- else:
- raise ValueError("String is not true/false: %r" % obj)
- return bool(obj)
- def bool_or_str(*text):
- """Return a callable that will evaluate a string as
- boolean, or one of a set of "alternate" string values.
- """
- def bool_or_value(obj):
- if obj in text:
- return obj
- else:
- return asbool(obj)
- return bool_or_value
- def asint(value):
- """Coerce to integer."""
- if value is None:
- return value
- return int(value)
- def coerce_kw_type(kw, key, type_, flexi_bool=True):
- r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
- necessary. If 'flexi_bool' is True, the string '0' is considered false
- when coercing to boolean.
- """
- if key in kw and not isinstance(kw[key], type_) and kw[key] is not None:
- if type_ is bool and flexi_bool:
- kw[key] = asbool(kw[key])
- else:
- kw[key] = type_(kw[key])
- def constructor_copy(obj, cls, *args, **kw):
- """Instantiate cls using the __dict__ of obj as constructor arguments.
- Uses inspect to match the named arguments of ``cls``.
- """
- names = get_cls_kwargs(cls)
- kw.update(
- (k, obj.__dict__[k]) for k in names.difference(kw)
- if k in obj.__dict__)
- return cls(*args, **kw)
- def counter():
- """Return a threadsafe counter function."""
- lock = compat.threading.Lock()
- counter = itertools.count(1)
- # avoid the 2to3 "next" transformation...
- def _next():
- lock.acquire()
- try:
- return next(counter)
- finally:
- lock.release()
- return _next
- def duck_type_collection(specimen, default=None):
- """Given an instance or class, guess if it is or is acting as one of
- the basic collection types: list, set and dict. If the __emulates__
- property is present, return that preferentially.
- """
- if hasattr(specimen, '__emulates__'):
- # canonicalize set vs sets.Set to a standard: the builtin set
- if (specimen.__emulates__ is not None and
- issubclass(specimen.__emulates__, set)):
- return set
- else:
- return specimen.__emulates__
- isa = isinstance(specimen, type) and issubclass or isinstance
- if isa(specimen, list):
- return list
- elif isa(specimen, set):
- return set
- elif isa(specimen, dict):
- return dict
- if hasattr(specimen, 'append'):
- return list
- elif hasattr(specimen, 'add'):
- return set
- elif hasattr(specimen, 'set'):
- return dict
- else:
- return default
- def assert_arg_type(arg, argtype, name):
- if isinstance(arg, argtype):
- return arg
- else:
- if isinstance(argtype, tuple):
- raise exc.ArgumentError(
- "Argument '%s' is expected to be one of type %s, got '%s'" %
- (name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
- else:
- raise exc.ArgumentError(
- "Argument '%s' is expected to be of type '%s', got '%s'" %
- (name, argtype, type(arg)))
- def dictlike_iteritems(dictlike):
- """Return a (key, value) iterator for almost any dict-like object."""
- if compat.py3k:
- if hasattr(dictlike, 'items'):
- return list(dictlike.items())
- else:
- if hasattr(dictlike, 'iteritems'):
- return dictlike.iteritems()
- elif hasattr(dictlike, 'items'):
- return iter(dictlike.items())
- getter = getattr(dictlike, '__getitem__', getattr(dictlike, 'get', None))
- if getter is None:
- raise TypeError(
- "Object '%r' is not dict-like" % dictlike)
- if hasattr(dictlike, 'iterkeys'):
- def iterator():
- for key in dictlike.iterkeys():
- yield key, getter(key)
- return iterator()
- elif hasattr(dictlike, 'keys'):
- return iter((key, getter(key)) for key in dictlike.keys())
- else:
- raise TypeError(
- "Object '%r' is not dict-like" % dictlike)
- class classproperty(property):
- """A decorator that behaves like @property except that operates
- on classes rather than instances.
- The decorator is currently special when using the declarative
- module, but note that the
- :class:`~.sqlalchemy.ext.declarative.declared_attr`
- decorator should be used for this purpose with declarative.
- """
- def __init__(self, fget, *arg, **kw):
- super(classproperty, self).__init__(fget, *arg, **kw)
- self.__doc__ = fget.__doc__
- def __get__(desc, self, cls):
- return desc.fget(cls)
- class hybridproperty(object):
- def __init__(self, func):
- self.func = func
- def __get__(self, instance, owner):
- if instance is None:
- clsval = self.func(owner)
- clsval.__doc__ = self.func.__doc__
- return clsval
- else:
- return self.func(instance)
- class hybridmethod(object):
- """Decorate a function as cls- or instance- level."""
- def __init__(self, func):
- self.func = func
- def __get__(self, instance, owner):
- if instance is None:
- return self.func.__get__(owner, owner.__class__)
- else:
- return self.func.__get__(instance, owner)
- class _symbol(int):
- def __new__(self, name, doc=None, canonical=None):
- """Construct a new named symbol."""
- assert isinstance(name, compat.string_types)
- if canonical is None:
- canonical = hash(name)
- v = int.__new__(_symbol, canonical)
- v.name = name
- if doc:
- v.__doc__ = doc
- return v
- def __reduce__(self):
- return symbol, (self.name, "x", int(self))
- def __str__(self):
- return repr(self)
- def __repr__(self):
- return "symbol(%r)" % self.name
- _symbol.__name__ = 'symbol'
- class symbol(object):
- """A constant symbol.
- >>> symbol('foo') is symbol('foo')
- True
- >>> symbol('foo')
- <symbol 'foo>
- A slight refinement of the MAGICCOOKIE=object() pattern. The primary
- advantage of symbol() is its repr(). They are also singletons.
- Repeated calls of symbol('name') will all return the same instance.
- The optional ``doc`` argument assigns to ``__doc__``. This
- is strictly so that Sphinx autoattr picks up the docstring we want
- (it doesn't appear to pick up the in-module docstring if the datamember
- is in a different module - autoattribute also blows up completely).
- If Sphinx fixes/improves this then we would no longer need
- ``doc`` here.
- """
- symbols = {}
- _lock = compat.threading.Lock()
- def __new__(cls, name, doc=None, canonical=None):
- cls._lock.acquire()
- try:
- sym = cls.symbols.get(name)
- if sym is None:
- cls.symbols[name] = sym = _symbol(name, doc, canonical)
- return sym
- finally:
- symbol._lock.release()
- _creation_order = 1
- def set_creation_order(instance):
- """Assign a '_creation_order' sequence to the given instance.
- This allows multiple instances to be sorted in order of creation
- (typically within a single thread; the counter is not particularly
- threadsafe).
- """
- global _creation_order
- instance._creation_order = _creation_order
- _creation_order += 1
- def warn_exception(func, *args, **kwargs):
- """executes the given function, catches all exceptions and converts to
- a warning.
- """
- try:
- return func(*args, **kwargs)
- except Exception:
- warn("%s('%s') ignored" % sys.exc_info()[0:2])
- def ellipses_string(value, len_=25):
- try:
- if len(value) > len_:
- return "%s..." % value[0:len_]
- else:
- return value
- except TypeError:
- return value
- class _hash_limit_string(compat.text_type):
- """A string subclass that can only be hashed on a maximum amount
- of unique values.
- This is used for warnings so that we can send out parameterized warnings
- without the __warningregistry__ of the module, or the non-overridable
- "once" registry within warnings.py, overloading memory,
- """
- def __new__(cls, value, num, args):
- interpolated = (value % args) + \
- (" (this warning may be suppressed after %d occurrences)" % num)
- self = super(_hash_limit_string, cls).__new__(cls, interpolated)
- self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
- return self
- def __hash__(self):
- return self._hash
- def __eq__(self, other):
- return hash(self) == hash(other)
- def warn(msg):
- """Issue a warning.
- If msg is a string, :class:`.exc.SAWarning` is used as
- the category.
- """
- warnings.warn(msg, exc.SAWarning, stacklevel=2)
- def warn_limited(msg, args):
- """Issue a warning with a paramterized string, limiting the number
- of registrations.
- """
- if args:
- msg = _hash_limit_string(msg, 10, args)
- warnings.warn(msg, exc.SAWarning, stacklevel=2)
- def only_once(fn):
- """Decorate the given function to be a no-op after it is called exactly
- once."""
- once = [fn]
- def go(*arg, **kw):
- if once:
- once_fn = once.pop()
- return once_fn(*arg, **kw)
- return go
- _SQLA_RE = re.compile(r'sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py')
- _UNITTEST_RE = re.compile(r'unit(?:2|test2?/)')
- def chop_traceback(tb, exclude_prefix=_UNITTEST_RE, exclude_suffix=_SQLA_RE):
- """Chop extraneous lines off beginning and end of a traceback.
- :param tb:
- a list of traceback lines as returned by ``traceback.format_stack()``
- :param exclude_prefix:
- a regular expression object matching lines to skip at beginning of
- ``tb``
- :param exclude_suffix:
- a regular expression object matching lines to skip at end of ``tb``
- """
- start = 0
- end = len(tb) - 1
- while start <= end and exclude_prefix.search(tb[start]):
- start += 1
- while start <= end and exclude_suffix.search(tb[end]):
- end -= 1
- return tb[start:end + 1]
- NoneType = type(None)
- def attrsetter(attrname):
- code = \
- "def set(obj, value):"\
- " obj.%s = value" % attrname
- env = locals().copy()
- exec(code, env)
- return env['set']
- class EnsureKWArgType(type):
- """Apply translation of functions to accept **kw arguments if they
- don't already.
- """
- def __init__(cls, clsname, bases, clsdict):
- fn_reg = cls.ensure_kwarg
- if fn_reg:
- for key in clsdict:
- m = re.match(fn_reg, key)
- if m:
- fn = clsdict[key]
- spec = compat.inspect_getargspec(fn)
- if not spec.keywords:
- clsdict[key] = wrapped = cls._wrap_w_kw(fn)
- setattr(cls, key, wrapped)
- super(EnsureKWArgType, cls).__init__(clsname, bases, clsdict)
- def _wrap_w_kw(self, fn):
- def wrap(*arg, **kw):
- return fn(*arg)
- return update_wrapper(wrap, fn)
- def wrap_callable(wrapper, fn):
- """Augment functools.update_wrapper() to work with objects with
- a ``__call__()`` method.
- :param fn:
- object with __call__ method
- """
- if hasattr(fn, '__name__'):
- return update_wrapper(wrapper, fn)
- else:
- _f = wrapper
- _f.__name__ = fn.__class__.__name__
- if hasattr(fn, '__module__'):
- _f.__module__ = fn.__module__
- if hasattr(fn.__call__, '__doc__') and fn.__call__.__doc__:
- _f.__doc__ = fn.__call__.__doc__
- elif fn.__doc__:
- _f.__doc__ = fn.__doc__
- return _f
|