123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- # util/compat.py
- # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- """Handle Python version/platform incompatibilities."""
- import sys
- from contextlib import contextmanager
- try:
- import threading
- except ImportError:
- import dummy_threading as threading
- py36 = sys.version_info >= (3, 6)
- py33 = sys.version_info >= (3, 3)
- py32 = sys.version_info >= (3, 2)
- py3k = sys.version_info >= (3, 0)
- py2k = sys.version_info < (3, 0)
- py265 = sys.version_info >= (2, 6, 5)
- jython = sys.platform.startswith('java')
- pypy = hasattr(sys, 'pypy_version_info')
- win32 = sys.platform.startswith('win')
- cpython = not pypy and not jython # TODO: something better for this ?
- import collections
- next = next
- if py3k:
- import pickle
- else:
- try:
- import cPickle as pickle
- except ImportError:
- import pickle
- # work around http://bugs.python.org/issue2646
- if py265:
- safe_kwarg = lambda arg: arg
- else:
- safe_kwarg = str
- ArgSpec = collections.namedtuple("ArgSpec",
- ["args", "varargs", "keywords", "defaults"])
- if py3k:
- import builtins
- from inspect import getfullargspec as inspect_getfullargspec
- from urllib.parse import (quote_plus, unquote_plus,
- parse_qsl, quote, unquote)
- import configparser
- from io import StringIO
- from io import BytesIO as byte_buffer
- def inspect_getargspec(func):
- return ArgSpec(
- *inspect_getfullargspec(func)[0:4]
- )
- string_types = str,
- binary_types = bytes,
- binary_type = bytes
- text_type = str
- int_types = int,
- iterbytes = iter
- def u(s):
- return s
- def ue(s):
- return s
- def b(s):
- return s.encode("latin-1")
- if py32:
- callable = callable
- else:
- def callable(fn):
- return hasattr(fn, '__call__')
- def cmp(a, b):
- return (a > b) - (a < b)
- from functools import reduce
- print_ = getattr(builtins, "print")
- import_ = getattr(builtins, '__import__')
- import itertools
- itertools_filterfalse = itertools.filterfalse
- itertools_filter = filter
- itertools_imap = map
- from itertools import zip_longest
- import base64
- def b64encode(x):
- return base64.b64encode(x).decode('ascii')
- def b64decode(x):
- return base64.b64decode(x.encode('ascii'))
- else:
- from inspect import getargspec as inspect_getfullargspec
- inspect_getargspec = inspect_getfullargspec
- from urllib import quote_plus, unquote_plus, quote, unquote
- from urlparse import parse_qsl
- import ConfigParser as configparser
- from StringIO import StringIO
- from cStringIO import StringIO as byte_buffer
- string_types = basestring,
- binary_types = bytes,
- binary_type = str
- text_type = unicode
- int_types = int, long
- def iterbytes(buf):
- return (ord(byte) for byte in buf)
- def u(s):
- # this differs from what six does, which doesn't support non-ASCII
- # strings - we only use u() with
- # literal source strings, and all our source files with non-ascii
- # in them (all are tests) are utf-8 encoded.
- return unicode(s, "utf-8")
- def ue(s):
- return unicode(s, "unicode_escape")
- def b(s):
- return s
- def import_(*args):
- if len(args) == 4:
- args = args[0:3] + ([str(arg) for arg in args[3]],)
- return __import__(*args)
- callable = callable
- cmp = cmp
- reduce = reduce
- import base64
- b64encode = base64.b64encode
- b64decode = base64.b64decode
- def print_(*args, **kwargs):
- fp = kwargs.pop("file", sys.stdout)
- if fp is None:
- return
- for arg in enumerate(args):
- if not isinstance(arg, basestring):
- arg = str(arg)
- fp.write(arg)
- import itertools
- itertools_filterfalse = itertools.ifilterfalse
- itertools_filter = itertools.ifilter
- itertools_imap = itertools.imap
- from itertools import izip_longest as zip_longest
- import time
- if win32 or jython:
- time_func = time.clock
- else:
- time_func = time.time
- from collections import namedtuple
- from operator import attrgetter as dottedgetter
- if py3k:
- def reraise(tp, value, tb=None, cause=None):
- if cause is not None:
- assert cause is not value, "Same cause emitted"
- value.__cause__ = cause
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
- else:
- # not as nice as that of Py3K, but at least preserves
- # the code line where the issue occurred
- exec("def reraise(tp, value, tb=None, cause=None):\n"
- " if cause is not None:\n"
- " assert cause is not value, 'Same cause emitted'\n"
- " raise tp, value, tb\n")
- def raise_from_cause(exception, exc_info=None):
- if exc_info is None:
- exc_info = sys.exc_info()
- exc_type, exc_value, exc_tb = exc_info
- cause = exc_value if exc_value is not exception else None
- reraise(type(exception), exception, tb=exc_tb, cause=cause)
- if py3k:
- exec_ = getattr(builtins, 'exec')
- else:
- def exec_(func_text, globals_, lcl=None):
- if lcl is None:
- exec('exec func_text in globals_')
- else:
- exec('exec func_text in globals_, lcl')
- def with_metaclass(meta, *bases):
- """Create a base class with a metaclass.
- Drops the middle class upon creation.
- Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
- """
- class metaclass(meta):
- __call__ = type.__call__
- __init__ = type.__init__
- def __new__(cls, name, this_bases, d):
- if this_bases is None:
- return type.__new__(cls, name, (), d)
- return meta(name, bases, d)
- return metaclass('temporary_class', None, {})
- @contextmanager
- def nested(*managers):
- """Implement contextlib.nested, mostly for unit tests.
- As tests still need to run on py2.6 we can't use multiple-with yet.
- Function is removed in py3k but also emits deprecation warning in 2.7
- so just roll it here for everyone.
- """
- exits = []
- vars = []
- exc = (None, None, None)
- try:
- for mgr in managers:
- exit = mgr.__exit__
- enter = mgr.__enter__
- vars.append(enter())
- exits.append(exit)
- yield vars
- except:
- exc = sys.exc_info()
- finally:
- while exits:
- exit = exits.pop()
- try:
- if exit(*exc):
- exc = (None, None, None)
- except:
- exc = sys.exc_info()
- if exc != (None, None, None):
- reraise(exc[0], exc[1], exc[2])
|