123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- # postgresql/pg8000.py
- # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors <see AUTHORS
- # file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- """
- .. dialect:: postgresql+pg8000
- :name: pg8000
- :dbapi: pg8000
- :connectstring: \
- postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://pythonhosted.org/pg8000/
- .. _pg8000_unicode:
- Unicode
- -------
- pg8000 will encode / decode string values between it and the server using the
- PostgreSQL ``client_encoding`` parameter; by default this is the value in
- the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
- Typically, this can be changed to ``utf-8``, as a more useful default::
- #client_encoding = sql_ascii # actually, defaults to database
- # encoding
- client_encoding = utf8
- The ``client_encoding`` can be overridden for a session by executing the SQL:
- SET CLIENT_ENCODING TO 'utf8';
- SQLAlchemy will execute this SQL on all new connections based on the value
- passed to :func:`.create_engine` using the ``client_encoding`` parameter::
- engine = create_engine(
- "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
- .. _pg8000_isolation_level:
- pg8000 Transaction Isolation Level
- -------------------------------------
- The pg8000 dialect offers the same isolation level settings as that
- of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
- * ``READ COMMITTED``
- * ``READ UNCOMMITTED``
- * ``REPEATABLE READ``
- * ``SERIALIZABLE``
- * ``AUTOCOMMIT``
- .. versionadded:: 0.9.5 support for AUTOCOMMIT isolation level when using
- pg8000.
- .. seealso::
- :ref:`postgresql_isolation_level`
- :ref:`psycopg2_isolation_level`
- """
- from ... import util, exc
- import decimal
- from ... import processors
- from ... import types as sqltypes
- from .base import (
- PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext,
- _DECIMAL_TYPES, _FLOAT_TYPES, _INT_TYPES)
- import re
- from sqlalchemy.dialects.postgresql.json import JSON
- class _PGNumeric(sqltypes.Numeric):
- def result_processor(self, dialect, coltype):
- if self.asdecimal:
- if coltype in _FLOAT_TYPES:
- return processors.to_decimal_processor_factory(
- decimal.Decimal, self._effective_decimal_return_scale)
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- # pg8000 returns Decimal natively for 1700
- return None
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype)
- else:
- if coltype in _FLOAT_TYPES:
- # pg8000 returns float natively for 701
- return None
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- return processors.to_float
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype)
- class _PGNumericNoBind(_PGNumeric):
- def bind_processor(self, dialect):
- return None
- class _PGJSON(JSON):
- def result_processor(self, dialect, coltype):
- if dialect._dbapi_version > (1, 10, 1):
- return None # Has native JSON
- else:
- return super(_PGJSON, self).result_processor(dialect, coltype)
- class PGExecutionContext_pg8000(PGExecutionContext):
- pass
- class PGCompiler_pg8000(PGCompiler):
- def visit_mod_binary(self, binary, operator, **kw):
- return self.process(binary.left, **kw) + " %% " + \
- self.process(binary.right, **kw)
- def post_process_text(self, text):
- if '%%' in text:
- util.warn("The SQLAlchemy postgresql dialect "
- "now automatically escapes '%' in text() "
- "expressions to '%%'.")
- return text.replace('%', '%%')
- class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
- def _escape_identifier(self, value):
- value = value.replace(self.escape_quote, self.escape_to_quote)
- return value.replace('%', '%%')
- class PGDialect_pg8000(PGDialect):
- driver = 'pg8000'
- supports_unicode_statements = True
- supports_unicode_binds = True
- default_paramstyle = 'format'
- supports_sane_multi_rowcount = True
- execution_ctx_cls = PGExecutionContext_pg8000
- statement_compiler = PGCompiler_pg8000
- preparer = PGIdentifierPreparer_pg8000
- description_encoding = 'use_encoding'
- colspecs = util.update_copy(
- PGDialect.colspecs,
- {
- sqltypes.Numeric: _PGNumericNoBind,
- sqltypes.Float: _PGNumeric,
- JSON: _PGJSON,
- sqltypes.JSON: _PGJSON
- }
- )
- def __init__(self, client_encoding=None, **kwargs):
- PGDialect.__init__(self, **kwargs)
- self.client_encoding = client_encoding
- def initialize(self, connection):
- self.supports_sane_multi_rowcount = self._dbapi_version >= (1, 9, 14)
- super(PGDialect_pg8000, self).initialize(connection)
- @util.memoized_property
- def _dbapi_version(self):
- if self.dbapi and hasattr(self.dbapi, '__version__'):
- return tuple(
- [
- int(x) for x in re.findall(
- r'(\d+)(?:[-\.]?|$)', self.dbapi.__version__)])
- else:
- return (99, 99, 99)
- @classmethod
- def dbapi(cls):
- return __import__('pg8000')
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username='user')
- if 'port' in opts:
- opts['port'] = int(opts['port'])
- opts.update(url.query)
- return ([], opts)
- def is_disconnect(self, e, connection, cursor):
- return "connection is closed" in str(e)
- def set_isolation_level(self, connection, level):
- level = level.replace('_', ' ')
- # adjust for ConnectionFairy possibly being present
- if hasattr(connection, 'connection'):
- connection = connection.connection
- if level == 'AUTOCOMMIT':
- connection.autocommit = True
- elif level in self._isolation_lookup:
- connection.autocommit = False
- cursor = connection.cursor()
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION "
- "ISOLATION LEVEL %s" % level)
- cursor.execute("COMMIT")
- cursor.close()
- else:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s or AUTOCOMMIT" %
- (level, self.name, ", ".join(self._isolation_lookup))
- )
- def set_client_encoding(self, connection, client_encoding):
- # adjust for ConnectionFairy possibly being present
- if hasattr(connection, 'connection'):
- connection = connection.connection
- cursor = connection.cursor()
- cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
- cursor.execute("COMMIT")
- cursor.close()
- def do_begin_twophase(self, connection, xid):
- connection.connection.tpc_begin((0, xid, ''))
- def do_prepare_twophase(self, connection, xid):
- connection.connection.tpc_prepare()
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False):
- connection.connection.tpc_rollback((0, xid, ''))
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False):
- connection.connection.tpc_commit((0, xid, ''))
- def do_recover_twophase(self, connection):
- return [row[1] for row in connection.connection.tpc_recover()]
- def on_connect(self):
- fns = []
- if self.client_encoding is not None:
- def on_connect(conn):
- self.set_client_encoding(conn, self.client_encoding)
- fns.append(on_connect)
- if self.isolation_level is not None:
- def on_connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
- fns.append(on_connect)
- if len(fns) > 0:
- def on_connect(conn):
- for fn in fns:
- fn(conn)
- return on_connect
- else:
- return None
- dialect = PGDialect_pg8000
|