pg8000.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # postgresql/pg8000.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors <see AUTHORS
  3. # file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. """
  8. .. dialect:: postgresql+pg8000
  9. :name: pg8000
  10. :dbapi: pg8000
  11. :connectstring: \
  12. postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
  13. :url: https://pythonhosted.org/pg8000/
  14. .. _pg8000_unicode:
  15. Unicode
  16. -------
  17. pg8000 will encode / decode string values between it and the server using the
  18. PostgreSQL ``client_encoding`` parameter; by default this is the value in
  19. the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
  20. Typically, this can be changed to ``utf-8``, as a more useful default::
  21. #client_encoding = sql_ascii # actually, defaults to database
  22. # encoding
  23. client_encoding = utf8
  24. The ``client_encoding`` can be overridden for a session by executing the SQL:
  25. SET CLIENT_ENCODING TO 'utf8';
  26. SQLAlchemy will execute this SQL on all new connections based on the value
  27. passed to :func:`.create_engine` using the ``client_encoding`` parameter::
  28. engine = create_engine(
  29. "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
  30. .. _pg8000_isolation_level:
  31. pg8000 Transaction Isolation Level
  32. -------------------------------------
  33. The pg8000 dialect offers the same isolation level settings as that
  34. of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
  35. * ``READ COMMITTED``
  36. * ``READ UNCOMMITTED``
  37. * ``REPEATABLE READ``
  38. * ``SERIALIZABLE``
  39. * ``AUTOCOMMIT``
  40. .. versionadded:: 0.9.5 support for AUTOCOMMIT isolation level when using
  41. pg8000.
  42. .. seealso::
  43. :ref:`postgresql_isolation_level`
  44. :ref:`psycopg2_isolation_level`
  45. """
  46. from ... import util, exc
  47. import decimal
  48. from ... import processors
  49. from ... import types as sqltypes
  50. from .base import (
  51. PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext,
  52. _DECIMAL_TYPES, _FLOAT_TYPES, _INT_TYPES)
  53. import re
  54. from sqlalchemy.dialects.postgresql.json import JSON
  55. class _PGNumeric(sqltypes.Numeric):
  56. def result_processor(self, dialect, coltype):
  57. if self.asdecimal:
  58. if coltype in _FLOAT_TYPES:
  59. return processors.to_decimal_processor_factory(
  60. decimal.Decimal, self._effective_decimal_return_scale)
  61. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  62. # pg8000 returns Decimal natively for 1700
  63. return None
  64. else:
  65. raise exc.InvalidRequestError(
  66. "Unknown PG numeric type: %d" % coltype)
  67. else:
  68. if coltype in _FLOAT_TYPES:
  69. # pg8000 returns float natively for 701
  70. return None
  71. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  72. return processors.to_float
  73. else:
  74. raise exc.InvalidRequestError(
  75. "Unknown PG numeric type: %d" % coltype)
  76. class _PGNumericNoBind(_PGNumeric):
  77. def bind_processor(self, dialect):
  78. return None
  79. class _PGJSON(JSON):
  80. def result_processor(self, dialect, coltype):
  81. if dialect._dbapi_version > (1, 10, 1):
  82. return None # Has native JSON
  83. else:
  84. return super(_PGJSON, self).result_processor(dialect, coltype)
  85. class PGExecutionContext_pg8000(PGExecutionContext):
  86. pass
  87. class PGCompiler_pg8000(PGCompiler):
  88. def visit_mod_binary(self, binary, operator, **kw):
  89. return self.process(binary.left, **kw) + " %% " + \
  90. self.process(binary.right, **kw)
  91. def post_process_text(self, text):
  92. if '%%' in text:
  93. util.warn("The SQLAlchemy postgresql dialect "
  94. "now automatically escapes '%' in text() "
  95. "expressions to '%%'.")
  96. return text.replace('%', '%%')
  97. class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
  98. def _escape_identifier(self, value):
  99. value = value.replace(self.escape_quote, self.escape_to_quote)
  100. return value.replace('%', '%%')
  101. class PGDialect_pg8000(PGDialect):
  102. driver = 'pg8000'
  103. supports_unicode_statements = True
  104. supports_unicode_binds = True
  105. default_paramstyle = 'format'
  106. supports_sane_multi_rowcount = True
  107. execution_ctx_cls = PGExecutionContext_pg8000
  108. statement_compiler = PGCompiler_pg8000
  109. preparer = PGIdentifierPreparer_pg8000
  110. description_encoding = 'use_encoding'
  111. colspecs = util.update_copy(
  112. PGDialect.colspecs,
  113. {
  114. sqltypes.Numeric: _PGNumericNoBind,
  115. sqltypes.Float: _PGNumeric,
  116. JSON: _PGJSON,
  117. sqltypes.JSON: _PGJSON
  118. }
  119. )
  120. def __init__(self, client_encoding=None, **kwargs):
  121. PGDialect.__init__(self, **kwargs)
  122. self.client_encoding = client_encoding
  123. def initialize(self, connection):
  124. self.supports_sane_multi_rowcount = self._dbapi_version >= (1, 9, 14)
  125. super(PGDialect_pg8000, self).initialize(connection)
  126. @util.memoized_property
  127. def _dbapi_version(self):
  128. if self.dbapi and hasattr(self.dbapi, '__version__'):
  129. return tuple(
  130. [
  131. int(x) for x in re.findall(
  132. r'(\d+)(?:[-\.]?|$)', self.dbapi.__version__)])
  133. else:
  134. return (99, 99, 99)
  135. @classmethod
  136. def dbapi(cls):
  137. return __import__('pg8000')
  138. def create_connect_args(self, url):
  139. opts = url.translate_connect_args(username='user')
  140. if 'port' in opts:
  141. opts['port'] = int(opts['port'])
  142. opts.update(url.query)
  143. return ([], opts)
  144. def is_disconnect(self, e, connection, cursor):
  145. return "connection is closed" in str(e)
  146. def set_isolation_level(self, connection, level):
  147. level = level.replace('_', ' ')
  148. # adjust for ConnectionFairy possibly being present
  149. if hasattr(connection, 'connection'):
  150. connection = connection.connection
  151. if level == 'AUTOCOMMIT':
  152. connection.autocommit = True
  153. elif level in self._isolation_lookup:
  154. connection.autocommit = False
  155. cursor = connection.cursor()
  156. cursor.execute(
  157. "SET SESSION CHARACTERISTICS AS TRANSACTION "
  158. "ISOLATION LEVEL %s" % level)
  159. cursor.execute("COMMIT")
  160. cursor.close()
  161. else:
  162. raise exc.ArgumentError(
  163. "Invalid value '%s' for isolation_level. "
  164. "Valid isolation levels for %s are %s or AUTOCOMMIT" %
  165. (level, self.name, ", ".join(self._isolation_lookup))
  166. )
  167. def set_client_encoding(self, connection, client_encoding):
  168. # adjust for ConnectionFairy possibly being present
  169. if hasattr(connection, 'connection'):
  170. connection = connection.connection
  171. cursor = connection.cursor()
  172. cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
  173. cursor.execute("COMMIT")
  174. cursor.close()
  175. def do_begin_twophase(self, connection, xid):
  176. connection.connection.tpc_begin((0, xid, ''))
  177. def do_prepare_twophase(self, connection, xid):
  178. connection.connection.tpc_prepare()
  179. def do_rollback_twophase(
  180. self, connection, xid, is_prepared=True, recover=False):
  181. connection.connection.tpc_rollback((0, xid, ''))
  182. def do_commit_twophase(
  183. self, connection, xid, is_prepared=True, recover=False):
  184. connection.connection.tpc_commit((0, xid, ''))
  185. def do_recover_twophase(self, connection):
  186. return [row[1] for row in connection.connection.tpc_recover()]
  187. def on_connect(self):
  188. fns = []
  189. if self.client_encoding is not None:
  190. def on_connect(conn):
  191. self.set_client_encoding(conn, self.client_encoding)
  192. fns.append(on_connect)
  193. if self.isolation_level is not None:
  194. def on_connect(conn):
  195. self.set_isolation_level(conn, self.isolation_level)
  196. fns.append(on_connect)
  197. if len(fns) > 0:
  198. def on_connect(conn):
  199. for fn in fns:
  200. fn(conn)
  201. return on_connect
  202. else:
  203. return None
  204. dialect = PGDialect_pg8000