base.py 82 KB


  1. # engine/base.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. from __future__ import with_statement
  8. """Defines :class:`.Connection` and :class:`.Engine`.
  9. """
  10. import sys
  11. from .. import exc, util, log, interfaces
  12. from ..sql import util as sql_util
  13. from ..sql import schema
  14. from .interfaces import Connectable, ExceptionContext
  15. from .util import _distill_params
  16. import contextlib
  17. class Connection(Connectable):
  18. """Provides high-level functionality for a wrapped DB-API connection.
  19. Provides execution support for string-based SQL statements as well as
  20. :class:`.ClauseElement`, :class:`.Compiled` and :class:`.DefaultGenerator`
  21. objects. Provides a :meth:`begin` method to return :class:`.Transaction`
  22. objects.
  23. The Connection object is **not** thread-safe. While a Connection can be
  24. shared among threads using properly synchronized access, it is still
  25. possible that the underlying DBAPI connection may not support shared
  26. access between threads. Check the DBAPI documentation for details.
  27. The Connection object represents a single dbapi connection checked out
  28. from the connection pool. In this state, the connection pool has no affect
  29. upon the connection, including its expiration or timeout state. For the
  30. connection pool to properly manage connections, connections should be
  31. returned to the connection pool (i.e. ``connection.close()``) whenever the
  32. connection is not in use.
  33. .. index::
  34. single: thread safety; Connection
  35. """
  36. schema_for_object = schema._schema_getter(None)
  37. """Return the ".schema" attribute for an object.
  38. Used for :class:`.Table`, :class:`.Sequence` and similar objects,
  39. and takes into account
  40. the :paramref:`.Connection.execution_options.schema_translate_map`
  41. parameter.
  42. .. versionadded:: 1.1
  43. .. seealso::
  44. :ref:`schema_translating`
  45. """
  46. def __init__(self, engine, connection=None, close_with_result=False,
  47. _branch_from=None, _execution_options=None,
  48. _dispatch=None,
  49. _has_events=None):
  50. """Construct a new Connection.
  51. The constructor here is not public and is only called only by an
  52. :class:`.Engine`. See :meth:`.Engine.connect` and
  53. :meth:`.Engine.contextual_connect` methods.
  54. """
  55. self.engine = engine
  56. self.dialect = engine.dialect
  57. self.__branch_from = _branch_from
  58. self.__branch = _branch_from is not None
  59. if _branch_from:
  60. self.__connection = connection
  61. self._execution_options = _execution_options
  62. self._echo = _branch_from._echo
  63. self.should_close_with_result = False
  64. self.dispatch = _dispatch
  65. self._has_events = _branch_from._has_events
  66. self.schema_for_object = _branch_from.schema_for_object
  67. else:
  68. self.__connection = connection \
  69. if connection is not None else engine.raw_connection()
  70. self.__transaction = None
  71. self.__savepoint_seq = 0
  72. self.should_close_with_result = close_with_result
  73. self.__invalid = False
  74. self.__can_reconnect = True
  75. self._echo = self.engine._should_log_info()
  76. if _has_events is None:
  77. # if _has_events is sent explicitly as False,
  78. # then don't join the dispatch of the engine; we don't
  79. # want to handle any of the engine's events in that case.
  80. self.dispatch = self.dispatch._join(engine.dispatch)
  81. self._has_events = _has_events or (
  82. _has_events is None and engine._has_events)
  83. assert not _execution_options
  84. self._execution_options = engine._execution_options
  85. if self._has_events or self.engine._has_events:
  86. self.dispatch.engine_connect(self, self.__branch)
  87. def _branch(self):
  88. """Return a new Connection which references this Connection's
  89. engine and connection; but does not have close_with_result enabled,
  90. and also whose close() method does nothing.
  91. The Core uses this very sparingly, only in the case of
  92. custom SQL default functions that are to be INSERTed as the
  93. primary key of a row where we need to get the value back, so we have
  94. to invoke it distinctly - this is a very uncommon case.
  95. Userland code accesses _branch() when the connect() or
  96. contextual_connect() methods are called. The branched connection
  97. acts as much as possible like the parent, except that it stays
  98. connected when a close() event occurs.
  99. """
  100. if self.__branch_from:
  101. return self.__branch_from._branch()
  102. else:
  103. return self.engine._connection_cls(
  104. self.engine,
  105. self.__connection,
  106. _branch_from=self,
  107. _execution_options=self._execution_options,
  108. _has_events=self._has_events,
  109. _dispatch=self.dispatch)
  110. @property
  111. def _root(self):
  112. """return the 'root' connection.
  113. Returns 'self' if this connection is not a branch, else
  114. returns the root connection from which we ultimately branched.
  115. """
  116. if self.__branch_from:
  117. return self.__branch_from
  118. else:
  119. return self
  120. def _clone(self):
  121. """Create a shallow copy of this Connection.
  122. """
  123. c = self.__class__.__new__(self.__class__)
  124. c.__dict__ = self.__dict__.copy()
  125. return c
  126. def __enter__(self):
  127. return self
  128. def __exit__(self, type, value, traceback):
  129. self.close()
  130. def execution_options(self, **opt):
  131. r""" Set non-SQL options for the connection which take effect
  132. during execution.
  133. The method returns a copy of this :class:`.Connection` which references
  134. the same underlying DBAPI connection, but also defines the given
  135. execution options which will take effect for a call to
  136. :meth:`execute`. As the new :class:`.Connection` references the same
  137. underlying resource, it's usually a good idea to ensure that the copies
  138. will be discarded immediately, which is implicit if used as in::
  139. result = connection.execution_options(stream_results=True).\
  140. execute(stmt)
  141. Note that any key/value can be passed to
  142. :meth:`.Connection.execution_options`, and it will be stored in the
  143. ``_execution_options`` dictionary of the :class:`.Connection`. It
  144. is suitable for usage by end-user schemes to communicate with
  145. event listeners, for example.
  146. The keywords that are currently recognized by SQLAlchemy itself
  147. include all those listed under :meth:`.Executable.execution_options`,
  148. as well as others that are specific to :class:`.Connection`.
  149. :param autocommit: Available on: Connection, statement.
  150. When True, a COMMIT will be invoked after execution
  151. when executed in 'autocommit' mode, i.e. when an explicit
  152. transaction is not begun on the connection. Note that DBAPI
  153. connections by default are always in a transaction - SQLAlchemy uses
  154. rules applied to different kinds of statements to determine if
  155. COMMIT will be invoked in order to provide its "autocommit" feature.
  156. Typically, all INSERT/UPDATE/DELETE statements as well as
  157. CREATE/DROP statements have autocommit behavior enabled; SELECT
  158. constructs do not. Use this option when invoking a SELECT or other
  159. specific SQL construct where COMMIT is desired (typically when
  160. calling stored procedures and such), and an explicit
  161. transaction is not in progress.
  162. :param compiled_cache: Available on: Connection.
  163. A dictionary where :class:`.Compiled` objects
  164. will be cached when the :class:`.Connection` compiles a clause
  165. expression into a :class:`.Compiled` object.
  166. It is the user's responsibility to
  167. manage the size of this dictionary, which will have keys
  168. corresponding to the dialect, clause element, the column
  169. names within the VALUES or SET clause of an INSERT or UPDATE,
  170. as well as the "batch" mode for an INSERT or UPDATE statement.
  171. The format of this dictionary is not guaranteed to stay the
  172. same in future releases.
  173. Note that the ORM makes use of its own "compiled" caches for
  174. some operations, including flush operations. The caching
  175. used by the ORM internally supersedes a cache dictionary
  176. specified here.
  177. :param isolation_level: Available on: :class:`.Connection`.
  178. Set the transaction isolation level for
  179. the lifespan of this :class:`.Connection` object (*not* the
  180. underlying DBAPI connection, for which the level is reset
  181. to its original setting upon termination of this
  182. :class:`.Connection` object).
  183. Valid values include
  184. those string values accepted by the
  185. :paramref:`.create_engine.isolation_level`
  186. parameter passed to :func:`.create_engine`. These levels are
  187. semi-database specific; see individual dialect documentation for
  188. valid levels.
  189. Note that this option necessarily affects the underlying
  190. DBAPI connection for the lifespan of the originating
  191. :class:`.Connection`, and is not per-execution. This
  192. setting is not removed until the underlying DBAPI connection
  193. is returned to the connection pool, i.e.
  194. the :meth:`.Connection.close` method is called.
  195. .. warning:: The ``isolation_level`` execution option should
  196. **not** be used when a transaction is already established, that
  197. is, the :meth:`.Connection.begin` method or similar has been
  198. called. A database cannot change the isolation level on a
  199. transaction in progress, and different DBAPIs and/or
  200. SQLAlchemy dialects may implicitly roll back or commit
  201. the transaction, or not affect the connection at all.
  202. .. versionchanged:: 0.9.9 A warning is emitted when the
  203. ``isolation_level`` execution option is used after a
  204. transaction has been started with :meth:`.Connection.begin`
  205. or similar.
  206. .. note:: The ``isolation_level`` execution option is implicitly
  207. reset if the :class:`.Connection` is invalidated, e.g. via
  208. the :meth:`.Connection.invalidate` method, or if a
  209. disconnection error occurs. The new connection produced after
  210. the invalidation will not have the isolation level re-applied
  211. to it automatically.
  212. .. seealso::
  213. :paramref:`.create_engine.isolation_level`
  214. - set per :class:`.Engine` isolation level
  215. :meth:`.Connection.get_isolation_level` - view current level
  216. :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
  217. :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`
  218. :ref:`MySQL Transaction Isolation <mysql_isolation_level>`
  219. :ref:`SQL Server Transaction Isolation <mssql_isolation_level>`
  220. :ref:`session_transaction_isolation` - for the ORM
  221. :param no_parameters: When ``True``, if the final parameter
  222. list or dictionary is totally empty, will invoke the
  223. statement on the cursor as ``cursor.execute(statement)``,
  224. not passing the parameter collection at all.
  225. Some DBAPIs such as psycopg2 and mysql-python consider
  226. percent signs as significant only when parameters are
  227. present; this option allows code to generate SQL
  228. containing percent signs (and possibly other characters)
  229. that is neutral regarding whether it's executed by the DBAPI
  230. or piped into a script that's later invoked by
  231. command line tools.
  232. .. versionadded:: 0.7.6
  233. :param stream_results: Available on: Connection, statement.
  234. Indicate to the dialect that results should be
  235. "streamed" and not pre-buffered, if possible. This is a limitation
  236. of many DBAPIs. The flag is currently understood only by the
  237. psycopg2, mysqldb and pymysql dialects.
  238. :param schema_translate_map: Available on: Connection, Engine.
  239. A dictionary mapping schema names to schema names, that will be
  240. applied to the :paramref:`.Table.schema` element of each
  241. :class:`.Table` encountered when SQL or DDL expression elements
  242. are compiled into strings; the resulting schema name will be
  243. converted based on presence in the map of the original name.
  244. .. versionadded:: 1.1
  245. .. seealso::
  246. :ref:`schema_translating`
  247. """
  248. c = self._clone()
  249. c._execution_options = c._execution_options.union(opt)
  250. if self._has_events or self.engine._has_events:
  251. self.dispatch.set_connection_execution_options(c, opt)
  252. self.dialect.set_connection_execution_options(c, opt)
  253. return c
  254. @property
  255. def closed(self):
  256. """Return True if this connection is closed."""
  257. return '_Connection__connection' not in self.__dict__ \
  258. and not self.__can_reconnect
  259. @property
  260. def invalidated(self):
  261. """Return True if this connection was invalidated."""
  262. return self._root.__invalid
  263. @property
  264. def connection(self):
  265. """The underlying DB-API connection managed by this Connection.
  266. .. seealso::
  267. :ref:`dbapi_connections`
  268. """
  269. try:
  270. return self.__connection
  271. except AttributeError:
  272. try:
  273. return self._revalidate_connection()
  274. except BaseException as e:
  275. self._handle_dbapi_exception(e, None, None, None, None)
  276. def get_isolation_level(self):
  277. """Return the current isolation level assigned to this
  278. :class:`.Connection`.
  279. This will typically be the default isolation level as determined
  280. by the dialect, unless if the
  281. :paramref:`.Connection.execution_options.isolation_level`
  282. feature has been used to alter the isolation level on a
  283. per-:class:`.Connection` basis.
  284. This attribute will typically perform a live SQL operation in order
  285. to procure the current isolation level, so the value returned is the
  286. actual level on the underlying DBAPI connection regardless of how
  287. this state was set. Compare to the
  288. :attr:`.Connection.default_isolation_level` accessor
  289. which returns the dialect-level setting without performing a SQL
  290. query.
  291. .. versionadded:: 0.9.9
  292. .. seealso::
  293. :attr:`.Connection.default_isolation_level` - view default level
  294. :paramref:`.create_engine.isolation_level`
  295. - set per :class:`.Engine` isolation level
  296. :paramref:`.Connection.execution_options.isolation_level`
  297. - set per :class:`.Connection` isolation level
  298. """
  299. try:
  300. return self.dialect.get_isolation_level(self.connection)
  301. except BaseException as e:
  302. self._handle_dbapi_exception(e, None, None, None, None)
  303. @property
  304. def default_isolation_level(self):
  305. """The default isolation level assigned to this :class:`.Connection`.
  306. This is the isolation level setting that the :class:`.Connection`
  307. has when first procured via the :meth:`.Engine.connect` method.
  308. This level stays in place until the
  309. :paramref:`.Connection.execution_options.isolation_level` is used
  310. to change the setting on a per-:class:`.Connection` basis.
  311. Unlike :meth:`.Connection.get_isolation_level`, this attribute is set
  312. ahead of time from the first connection procured by the dialect,
  313. so SQL query is not invoked when this accessor is called.
  314. .. versionadded:: 0.9.9
  315. .. seealso::
  316. :meth:`.Connection.get_isolation_level` - view current level
  317. :paramref:`.create_engine.isolation_level`
  318. - set per :class:`.Engine` isolation level
  319. :paramref:`.Connection.execution_options.isolation_level`
  320. - set per :class:`.Connection` isolation level
  321. """
  322. return self.dialect.default_isolation_level
  323. def _revalidate_connection(self):
  324. if self.__branch_from:
  325. return self.__branch_from._revalidate_connection()
  326. if self.__can_reconnect and self.__invalid:
  327. if self.__transaction is not None:
  328. raise exc.InvalidRequestError(
  329. "Can't reconnect until invalid "
  330. "transaction is rolled back")
  331. self.__connection = self.engine.raw_connection(_connection=self)
  332. self.__invalid = False
  333. return self.__connection
  334. raise exc.ResourceClosedError("This Connection is closed")
  335. @property
  336. def _connection_is_valid(self):
  337. # use getattr() for is_valid to support exceptions raised in
  338. # dialect initializer, where the connection is not wrapped in
  339. # _ConnectionFairy
  340. return getattr(self.__connection, 'is_valid', False)
  341. @property
  342. def _still_open_and_connection_is_valid(self):
  343. return \
  344. not self.closed and \
  345. not self.invalidated and \
  346. getattr(self.__connection, 'is_valid', False)
  347. @property
  348. def info(self):
  349. """Info dictionary associated with the underlying DBAPI connection
  350. referred to by this :class:`.Connection`, allowing user-defined
  351. data to be associated with the connection.
  352. The data here will follow along with the DBAPI connection including
  353. after it is returned to the connection pool and used again
  354. in subsequent instances of :class:`.Connection`.
  355. """
  356. return self.connection.info
  357. def connect(self):
  358. """Returns a branched version of this :class:`.Connection`.
  359. The :meth:`.Connection.close` method on the returned
  360. :class:`.Connection` can be called and this
  361. :class:`.Connection` will remain open.
  362. This method provides usage symmetry with
  363. :meth:`.Engine.connect`, including for usage
  364. with context managers.
  365. """
  366. return self._branch()
  367. def contextual_connect(self, **kwargs):
  368. """Returns a branched version of this :class:`.Connection`.
  369. The :meth:`.Connection.close` method on the returned
  370. :class:`.Connection` can be called and this
  371. :class:`.Connection` will remain open.
  372. This method provides usage symmetry with
  373. :meth:`.Engine.contextual_connect`, including for usage
  374. with context managers.
  375. """
  376. return self._branch()
  377. def invalidate(self, exception=None):
  378. """Invalidate the underlying DBAPI connection associated with
  379. this :class:`.Connection`.
  380. The underlying DBAPI connection is literally closed (if
  381. possible), and is discarded. Its source connection pool will
  382. typically lazily create a new connection to replace it.
  383. Upon the next use (where "use" typically means using the
  384. :meth:`.Connection.execute` method or similar),
  385. this :class:`.Connection` will attempt to
  386. procure a new DBAPI connection using the services of the
  387. :class:`.Pool` as a source of connectivity (e.g. a "reconnection").
  388. If a transaction was in progress (e.g. the
  389. :meth:`.Connection.begin` method has been called) when
  390. :meth:`.Connection.invalidate` method is called, at the DBAPI
  391. level all state associated with this transaction is lost, as
  392. the DBAPI connection is closed. The :class:`.Connection`
  393. will not allow a reconnection to proceed until the
  394. :class:`.Transaction` object is ended, by calling the
  395. :meth:`.Transaction.rollback` method; until that point, any attempt at
  396. continuing to use the :class:`.Connection` will raise an
  397. :class:`~sqlalchemy.exc.InvalidRequestError`.
  398. This is to prevent applications from accidentally
  399. continuing an ongoing transactional operations despite the
  400. fact that the transaction has been lost due to an
  401. invalidation.
  402. The :meth:`.Connection.invalidate` method, just like auto-invalidation,
  403. will at the connection pool level invoke the
  404. :meth:`.PoolEvents.invalidate` event.
  405. .. seealso::
  406. :ref:`pool_connection_invalidation`
  407. """
  408. if self.invalidated:
  409. return
  410. if self.closed:
  411. raise exc.ResourceClosedError("This Connection is closed")
  412. if self._root._connection_is_valid:
  413. self._root.__connection.invalidate(exception)
  414. del self._root.__connection
  415. self._root.__invalid = True
  416. def detach(self):
  417. """Detach the underlying DB-API connection from its connection pool.
  418. E.g.::
  419. with engine.connect() as conn:
  420. conn.detach()
  421. conn.execute("SET search_path TO schema1, schema2")
  422. # work with connection
  423. # connection is fully closed (since we used "with:", can
  424. # also call .close())
  425. This :class:`.Connection` instance will remain usable. When closed
  426. (or exited from a context manager context as above),
  427. the DB-API connection will be literally closed and not
  428. returned to its originating pool.
  429. This method can be used to insulate the rest of an application
  430. from a modified state on a connection (such as a transaction
  431. isolation level or similar).
  432. """
  433. self.__connection.detach()
  434. def begin(self):
  435. """Begin a transaction and return a transaction handle.
  436. The returned object is an instance of :class:`.Transaction`.
  437. This object represents the "scope" of the transaction,
  438. which completes when either the :meth:`.Transaction.rollback`
  439. or :meth:`.Transaction.commit` method is called.
  440. Nested calls to :meth:`.begin` on the same :class:`.Connection`
  441. will return new :class:`.Transaction` objects that represent
  442. an emulated transaction within the scope of the enclosing
  443. transaction, that is::
  444. trans = conn.begin() # outermost transaction
  445. trans2 = conn.begin() # "nested"
  446. trans2.commit() # does nothing
  447. trans.commit() # actually commits
  448. Calls to :meth:`.Transaction.commit` only have an effect
  449. when invoked via the outermost :class:`.Transaction` object, though the
  450. :meth:`.Transaction.rollback` method of any of the
  451. :class:`.Transaction` objects will roll back the
  452. transaction.
  453. See also:
  454. :meth:`.Connection.begin_nested` - use a SAVEPOINT
  455. :meth:`.Connection.begin_twophase` - use a two phase /XID transaction
  456. :meth:`.Engine.begin` - context manager available from
  457. :class:`.Engine`.
  458. """
  459. if self.__branch_from:
  460. return self.__branch_from.begin()
  461. if self.__transaction is None:
  462. self.__transaction = RootTransaction(self)
  463. return self.__transaction
  464. else:
  465. return Transaction(self, self.__transaction)
  466. def begin_nested(self):
  467. """Begin a nested transaction and return a transaction handle.
  468. The returned object is an instance of :class:`.NestedTransaction`.
  469. Nested transactions require SAVEPOINT support in the
  470. underlying database. Any transaction in the hierarchy may
  471. ``commit`` and ``rollback``, however the outermost transaction
  472. still controls the overall ``commit`` or ``rollback`` of the
  473. transaction of a whole.
  474. See also :meth:`.Connection.begin`,
  475. :meth:`.Connection.begin_twophase`.
  476. """
  477. if self.__branch_from:
  478. return self.__branch_from.begin_nested()
  479. if self.__transaction is None:
  480. self.__transaction = RootTransaction(self)
  481. else:
  482. self.__transaction = NestedTransaction(self, self.__transaction)
  483. return self.__transaction
  484. def begin_twophase(self, xid=None):
  485. """Begin a two-phase or XA transaction and return a transaction
  486. handle.
  487. The returned object is an instance of :class:`.TwoPhaseTransaction`,
  488. which in addition to the methods provided by
  489. :class:`.Transaction`, also provides a
  490. :meth:`~.TwoPhaseTransaction.prepare` method.
  491. :param xid: the two phase transaction id. If not supplied, a
  492. random id will be generated.
  493. See also :meth:`.Connection.begin`,
  494. :meth:`.Connection.begin_twophase`.
  495. """
  496. if self.__branch_from:
  497. return self.__branch_from.begin_twophase(xid=xid)
  498. if self.__transaction is not None:
  499. raise exc.InvalidRequestError(
  500. "Cannot start a two phase transaction when a transaction "
  501. "is already in progress.")
  502. if xid is None:
  503. xid = self.engine.dialect.create_xid()
  504. self.__transaction = TwoPhaseTransaction(self, xid)
  505. return self.__transaction
  506. def recover_twophase(self):
  507. return self.engine.dialect.do_recover_twophase(self)
  508. def rollback_prepared(self, xid, recover=False):
  509. self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
  510. def commit_prepared(self, xid, recover=False):
  511. self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
  512. def in_transaction(self):
  513. """Return True if a transaction is in progress."""
  514. return self._root.__transaction is not None
  515. def _begin_impl(self, transaction):
  516. assert not self.__branch_from
  517. if self._echo:
  518. self.engine.logger.info("BEGIN (implicit)")
  519. if self._has_events or self.engine._has_events:
  520. self.dispatch.begin(self)
  521. try:
  522. self.engine.dialect.do_begin(self.connection)
  523. if self.connection._reset_agent is None:
  524. self.connection._reset_agent = transaction
  525. except BaseException as e:
  526. self._handle_dbapi_exception(e, None, None, None, None)
  527. def _rollback_impl(self):
  528. assert not self.__branch_from
  529. if self._has_events or self.engine._has_events:
  530. self.dispatch.rollback(self)
  531. if self._still_open_and_connection_is_valid:
  532. if self._echo:
  533. self.engine.logger.info("ROLLBACK")
  534. try:
  535. self.engine.dialect.do_rollback(self.connection)
  536. except BaseException as e:
  537. self._handle_dbapi_exception(e, None, None, None, None)
  538. finally:
  539. if not self.__invalid and \
  540. self.connection._reset_agent is self.__transaction:
  541. self.connection._reset_agent = None
  542. self.__transaction = None
  543. else:
  544. self.__transaction = None
  545. def _commit_impl(self, autocommit=False):
  546. assert not self.__branch_from
  547. if self._has_events or self.engine._has_events:
  548. self.dispatch.commit(self)
  549. if self._echo:
  550. self.engine.logger.info("COMMIT")
  551. try:
  552. self.engine.dialect.do_commit(self.connection)
  553. except BaseException as e:
  554. self._handle_dbapi_exception(e, None, None, None, None)
  555. finally:
  556. if not self.__invalid and \
  557. self.connection._reset_agent is self.__transaction:
  558. self.connection._reset_agent = None
  559. self.__transaction = None
  560. def _savepoint_impl(self, name=None):
  561. assert not self.__branch_from
  562. if self._has_events or self.engine._has_events:
  563. self.dispatch.savepoint(self, name)
  564. if name is None:
  565. self.__savepoint_seq += 1
  566. name = 'sa_savepoint_%s' % self.__savepoint_seq
  567. if self._still_open_and_connection_is_valid:
  568. self.engine.dialect.do_savepoint(self, name)
  569. return name
  570. def _rollback_to_savepoint_impl(self, name, context):
  571. assert not self.__branch_from
  572. if self._has_events or self.engine._has_events:
  573. self.dispatch.rollback_savepoint(self, name, context)
  574. if self._still_open_and_connection_is_valid:
  575. self.engine.dialect.do_rollback_to_savepoint(self, name)
  576. self.__transaction = context
  577. def _release_savepoint_impl(self, name, context):
  578. assert not self.__branch_from
  579. if self._has_events or self.engine._has_events:
  580. self.dispatch.release_savepoint(self, name, context)
  581. if self._still_open_and_connection_is_valid:
  582. self.engine.dialect.do_release_savepoint(self, name)
  583. self.__transaction = context
  584. def _begin_twophase_impl(self, transaction):
  585. assert not self.__branch_from
  586. if self._echo:
  587. self.engine.logger.info("BEGIN TWOPHASE (implicit)")
  588. if self._has_events or self.engine._has_events:
  589. self.dispatch.begin_twophase(self, transaction.xid)
  590. if self._still_open_and_connection_is_valid:
  591. self.engine.dialect.do_begin_twophase(self, transaction.xid)
  592. if self.connection._reset_agent is None:
  593. self.connection._reset_agent = transaction
  594. def _prepare_twophase_impl(self, xid):
  595. assert not self.__branch_from
  596. if self._has_events or self.engine._has_events:
  597. self.dispatch.prepare_twophase(self, xid)
  598. if self._still_open_and_connection_is_valid:
  599. assert isinstance(self.__transaction, TwoPhaseTransaction)
  600. self.engine.dialect.do_prepare_twophase(self, xid)
  601. def _rollback_twophase_impl(self, xid, is_prepared):
  602. assert not self.__branch_from
  603. if self._has_events or self.engine._has_events:
  604. self.dispatch.rollback_twophase(self, xid, is_prepared)
  605. if self._still_open_and_connection_is_valid:
  606. assert isinstance(self.__transaction, TwoPhaseTransaction)
  607. try:
  608. self.engine.dialect.do_rollback_twophase(
  609. self, xid, is_prepared)
  610. finally:
  611. if self.connection._reset_agent is self.__transaction:
  612. self.connection._reset_agent = None
  613. self.__transaction = None
  614. else:
  615. self.__transaction = None
  616. def _commit_twophase_impl(self, xid, is_prepared):
  617. assert not self.__branch_from
  618. if self._has_events or self.engine._has_events:
  619. self.dispatch.commit_twophase(self, xid, is_prepared)
  620. if self._still_open_and_connection_is_valid:
  621. assert isinstance(self.__transaction, TwoPhaseTransaction)
  622. try:
  623. self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
  624. finally:
  625. if self.connection._reset_agent is self.__transaction:
  626. self.connection._reset_agent = None
  627. self.__transaction = None
  628. else:
  629. self.__transaction = None
  630. def _autorollback(self):
  631. if not self._root.in_transaction():
  632. self._root._rollback_impl()
  633. def close(self):
  634. """Close this :class:`.Connection`.
  635. This results in a release of the underlying database
  636. resources, that is, the DBAPI connection referenced
  637. internally. The DBAPI connection is typically restored
  638. back to the connection-holding :class:`.Pool` referenced
  639. by the :class:`.Engine` that produced this
  640. :class:`.Connection`. Any transactional state present on
  641. the DBAPI connection is also unconditionally released via
  642. the DBAPI connection's ``rollback()`` method, regardless
  643. of any :class:`.Transaction` object that may be
  644. outstanding with regards to this :class:`.Connection`.
  645. After :meth:`~.Connection.close` is called, the
  646. :class:`.Connection` is permanently in a closed state,
  647. and will allow no further operations.
  648. """
  649. if self.__branch_from:
  650. try:
  651. del self.__connection
  652. except AttributeError:
  653. pass
  654. finally:
  655. self.__can_reconnect = False
  656. return
  657. try:
  658. conn = self.__connection
  659. except AttributeError:
  660. pass
  661. else:
  662. conn.close()
  663. if conn._reset_agent is self.__transaction:
  664. conn._reset_agent = None
  665. # the close() process can end up invalidating us,
  666. # as the pool will call our transaction as the "reset_agent"
  667. # for rollback(), which can then cause an invalidation
  668. if not self.__invalid:
  669. del self.__connection
  670. self.__can_reconnect = False
  671. self.__transaction = None
  672. def scalar(self, object, *multiparams, **params):
  673. """Executes and returns the first column of the first row.
  674. The underlying result/cursor is closed after execution.
  675. """
  676. return self.execute(object, *multiparams, **params).scalar()
  677. def execute(self, object, *multiparams, **params):
  678. r"""Executes a SQL statement construct and returns a
  679. :class:`.ResultProxy`.
  680. :param object: The statement to be executed. May be
  681. one of:
  682. * a plain string
  683. * any :class:`.ClauseElement` construct that is also
  684. a subclass of :class:`.Executable`, such as a
  685. :func:`~.expression.select` construct
  686. * a :class:`.FunctionElement`, such as that generated
  687. by :data:`.func`, will be automatically wrapped in
  688. a SELECT statement, which is then executed.
  689. * a :class:`.DDLElement` object
  690. * a :class:`.DefaultGenerator` object
  691. * a :class:`.Compiled` object
  692. :param \*multiparams/\**params: represent bound parameter
  693. values to be used in the execution. Typically,
  694. the format is either a collection of one or more
  695. dictionaries passed to \*multiparams::
  696. conn.execute(
  697. table.insert(),
  698. {"id":1, "value":"v1"},
  699. {"id":2, "value":"v2"}
  700. )
  701. ...or individual key/values interpreted by \**params::
  702. conn.execute(
  703. table.insert(), id=1, value="v1"
  704. )
  705. In the case that a plain SQL string is passed, and the underlying
  706. DBAPI accepts positional bind parameters, a collection of tuples
  707. or individual values in \*multiparams may be passed::
  708. conn.execute(
  709. "INSERT INTO table (id, value) VALUES (?, ?)",
  710. (1, "v1"), (2, "v2")
  711. )
  712. conn.execute(
  713. "INSERT INTO table (id, value) VALUES (?, ?)",
  714. 1, "v1"
  715. )
  716. Note above, the usage of a question mark "?" or other
  717. symbol is contingent upon the "paramstyle" accepted by the DBAPI
  718. in use, which may be any of "qmark", "named", "pyformat", "format",
  719. "numeric". See `pep-249 <http://www.python.org/dev/peps/pep-0249/>`_
  720. for details on paramstyle.
  721. To execute a textual SQL statement which uses bound parameters in a
  722. DBAPI-agnostic way, use the :func:`~.expression.text` construct.
  723. """
  724. if isinstance(object, util.string_types[0]):
  725. return self._execute_text(object, multiparams, params)
  726. try:
  727. meth = object._execute_on_connection
  728. except AttributeError:
  729. raise exc.ObjectNotExecutableError(object)
  730. else:
  731. return meth(self, multiparams, params)
  732. def _execute_function(self, func, multiparams, params):
  733. """Execute a sql.FunctionElement object."""
  734. return self._execute_clauseelement(func.select(),
  735. multiparams, params)
  736. def _execute_default(self, default, multiparams, params):
  737. """Execute a schema.ColumnDefault object."""
  738. if self._has_events or self.engine._has_events:
  739. for fn in self.dispatch.before_execute:
  740. default, multiparams, params = \
  741. fn(self, default, multiparams, params)
  742. try:
  743. try:
  744. conn = self.__connection
  745. except AttributeError:
  746. conn = self._revalidate_connection()
  747. dialect = self.dialect
  748. ctx = dialect.execution_ctx_cls._init_default(
  749. dialect, self, conn)
  750. except BaseException as e:
  751. self._handle_dbapi_exception(e, None, None, None, None)
  752. ret = ctx._exec_default(default, None)
  753. if self.should_close_with_result:
  754. self.close()
  755. if self._has_events or self.engine._has_events:
  756. self.dispatch.after_execute(self,
  757. default, multiparams, params, ret)
  758. return ret
  759. def _execute_ddl(self, ddl, multiparams, params):
  760. """Execute a schema.DDL object."""
  761. if self._has_events or self.engine._has_events:
  762. for fn in self.dispatch.before_execute:
  763. ddl, multiparams, params = \
  764. fn(self, ddl, multiparams, params)
  765. dialect = self.dialect
  766. compiled = ddl.compile(
  767. dialect=dialect,
  768. schema_translate_map=self.schema_for_object
  769. if not self.schema_for_object.is_default else None)
  770. ret = self._execute_context(
  771. dialect,
  772. dialect.execution_ctx_cls._init_ddl,
  773. compiled,
  774. None,
  775. compiled
  776. )
  777. if self._has_events or self.engine._has_events:
  778. self.dispatch.after_execute(self,
  779. ddl, multiparams, params, ret)
  780. return ret
  781. def _execute_clauseelement(self, elem, multiparams, params):
  782. """Execute a sql.ClauseElement object."""
  783. if self._has_events or self.engine._has_events:
  784. for fn in self.dispatch.before_execute:
  785. elem, multiparams, params = \
  786. fn(self, elem, multiparams, params)
  787. distilled_params = _distill_params(multiparams, params)
  788. if distilled_params:
  789. # note this is usually dict but we support RowProxy
  790. # as well; but dict.keys() as an iterable is OK
  791. keys = distilled_params[0].keys()
  792. else:
  793. keys = []
  794. dialect = self.dialect
  795. if 'compiled_cache' in self._execution_options:
  796. key = (
  797. dialect, elem, tuple(sorted(keys)),
  798. self.schema_for_object.hash_key,
  799. len(distilled_params) > 1
  800. )
  801. compiled_sql = self._execution_options['compiled_cache'].get(key)
  802. if compiled_sql is None:
  803. compiled_sql = elem.compile(
  804. dialect=dialect, column_keys=keys,
  805. inline=len(distilled_params) > 1,
  806. schema_translate_map=self.schema_for_object
  807. if not self.schema_for_object.is_default else None
  808. )
  809. self._execution_options['compiled_cache'][key] = compiled_sql
  810. else:
  811. compiled_sql = elem.compile(
  812. dialect=dialect, column_keys=keys,
  813. inline=len(distilled_params) > 1,
  814. schema_translate_map=self.schema_for_object
  815. if not self.schema_for_object.is_default else None)
  816. ret = self._execute_context(
  817. dialect,
  818. dialect.execution_ctx_cls._init_compiled,
  819. compiled_sql,
  820. distilled_params,
  821. compiled_sql, distilled_params
  822. )
  823. if self._has_events or self.engine._has_events:
  824. self.dispatch.after_execute(self,
  825. elem, multiparams, params, ret)
  826. return ret
  827. def _execute_compiled(self, compiled, multiparams, params):
  828. """Execute a sql.Compiled object."""
  829. if self._has_events or self.engine._has_events:
  830. for fn in self.dispatch.before_execute:
  831. compiled, multiparams, params = \
  832. fn(self, compiled, multiparams, params)
  833. dialect = self.dialect
  834. parameters = _distill_params(multiparams, params)
  835. ret = self._execute_context(
  836. dialect,
  837. dialect.execution_ctx_cls._init_compiled,
  838. compiled,
  839. parameters,
  840. compiled, parameters
  841. )
  842. if self._has_events or self.engine._has_events:
  843. self.dispatch.after_execute(self,
  844. compiled, multiparams, params, ret)
  845. return ret
  846. def _execute_text(self, statement, multiparams, params):
  847. """Execute a string SQL statement."""
  848. if self._has_events or self.engine._has_events:
  849. for fn in self.dispatch.before_execute:
  850. statement, multiparams, params = \
  851. fn(self, statement, multiparams, params)
  852. dialect = self.dialect
  853. parameters = _distill_params(multiparams, params)
  854. ret = self._execute_context(
  855. dialect,
  856. dialect.execution_ctx_cls._init_statement,
  857. statement,
  858. parameters,
  859. statement, parameters
  860. )
  861. if self._has_events or self.engine._has_events:
  862. self.dispatch.after_execute(self,
  863. statement, multiparams, params, ret)
  864. return ret
  865. def _execute_context(self, dialect, constructor,
  866. statement, parameters,
  867. *args):
  868. """Create an :class:`.ExecutionContext` and execute, returning
  869. a :class:`.ResultProxy`."""
  870. try:
  871. try:
  872. conn = self.__connection
  873. except AttributeError:
  874. conn = self._revalidate_connection()
  875. context = constructor(dialect, self, conn, *args)
  876. except BaseException as e:
  877. self._handle_dbapi_exception(
  878. e,
  879. util.text_type(statement), parameters,
  880. None, None)
  881. if context.compiled:
  882. context.pre_exec()
  883. cursor, statement, parameters = context.cursor, \
  884. context.statement, \
  885. context.parameters
  886. if not context.executemany:
  887. parameters = parameters[0]
  888. if self._has_events or self.engine._has_events:
  889. for fn in self.dispatch.before_cursor_execute:
  890. statement, parameters = \
  891. fn(self, cursor, statement, parameters,
  892. context, context.executemany)
  893. if self._echo:
  894. self.engine.logger.info(statement)
  895. self.engine.logger.info(
  896. "%r",
  897. sql_util._repr_params(parameters, batches=10)
  898. )
  899. evt_handled = False
  900. try:
  901. if context.executemany:
  902. if self.dialect._has_events:
  903. for fn in self.dialect.dispatch.do_executemany:
  904. if fn(cursor, statement, parameters, context):
  905. evt_handled = True
  906. break
  907. if not evt_handled:
  908. self.dialect.do_executemany(
  909. cursor,
  910. statement,
  911. parameters,
  912. context)
  913. elif not parameters and context.no_parameters:
  914. if self.dialect._has_events:
  915. for fn in self.dialect.dispatch.do_execute_no_params:
  916. if fn(cursor, statement, context):
  917. evt_handled = True
  918. break
  919. if not evt_handled:
  920. self.dialect.do_execute_no_params(
  921. cursor,
  922. statement,
  923. context)
  924. else:
  925. if self.dialect._has_events:
  926. for fn in self.dialect.dispatch.do_execute:
  927. if fn(cursor, statement, parameters, context):
  928. evt_handled = True
  929. break
  930. if not evt_handled:
  931. self.dialect.do_execute(
  932. cursor,
  933. statement,
  934. parameters,
  935. context)
  936. except BaseException as e:
  937. self._handle_dbapi_exception(
  938. e,
  939. statement,
  940. parameters,
  941. cursor,
  942. context)
  943. if self._has_events or self.engine._has_events:
  944. self.dispatch.after_cursor_execute(self, cursor,
  945. statement,
  946. parameters,
  947. context,
  948. context.executemany)
  949. if context.compiled:
  950. context.post_exec()
  951. if context.is_crud or context.is_text:
  952. result = context._setup_crud_result_proxy()
  953. else:
  954. result = context.get_result_proxy()
  955. if result._metadata is None:
  956. result._soft_close()
  957. if context.should_autocommit and self._root.__transaction is None:
  958. self._root._commit_impl(autocommit=True)
  959. # for "connectionless" execution, we have to close this
  960. # Connection after the statement is complete.
  961. if self.should_close_with_result:
  962. # ResultProxy already exhausted rows / has no rows.
  963. # close us now
  964. if result._soft_closed:
  965. self.close()
  966. else:
  967. # ResultProxy will close this Connection when no more
  968. # rows to fetch.
  969. result._autoclose_connection = True
  970. return result
  971. def _cursor_execute(self, cursor, statement, parameters, context=None):
  972. """Execute a statement + params on the given cursor.
  973. Adds appropriate logging and exception handling.
  974. This method is used by DefaultDialect for special-case
  975. executions, such as for sequences and column defaults.
  976. The path of statement execution in the majority of cases
  977. terminates at _execute_context().
  978. """
  979. if self._has_events or self.engine._has_events:
  980. for fn in self.dispatch.before_cursor_execute:
  981. statement, parameters = \
  982. fn(self, cursor, statement, parameters,
  983. context,
  984. False)
  985. if self._echo:
  986. self.engine.logger.info(statement)
  987. self.engine.logger.info("%r", parameters)
  988. try:
  989. for fn in () if not self.dialect._has_events \
  990. else self.dialect.dispatch.do_execute:
  991. if fn(cursor, statement, parameters, context):
  992. break
  993. else:
  994. self.dialect.do_execute(
  995. cursor,
  996. statement,
  997. parameters,
  998. context)
  999. except BaseException as e:
  1000. self._handle_dbapi_exception(
  1001. e,
  1002. statement,
  1003. parameters,
  1004. cursor,
  1005. context)
  1006. if self._has_events or self.engine._has_events:
  1007. self.dispatch.after_cursor_execute(self, cursor,
  1008. statement,
  1009. parameters,
  1010. context,
  1011. False)
  1012. def _safe_close_cursor(self, cursor):
  1013. """Close the given cursor, catching exceptions
  1014. and turning into log warnings.
  1015. """
  1016. try:
  1017. cursor.close()
  1018. except Exception:
  1019. # log the error through the connection pool's logger.
  1020. self.engine.pool.logger.error(
  1021. "Error closing cursor", exc_info=True)
  1022. _reentrant_error = False
  1023. _is_disconnect = False
  1024. def _handle_dbapi_exception(self,
  1025. e,
  1026. statement,
  1027. parameters,
  1028. cursor,
  1029. context):
  1030. exc_info = sys.exc_info()
  1031. if context and context.exception is None:
  1032. context.exception = e
  1033. is_exit_exception = not isinstance(e, Exception)
  1034. if not self._is_disconnect:
  1035. self._is_disconnect = (
  1036. isinstance(e, self.dialect.dbapi.Error) and
  1037. not self.closed and
  1038. self.dialect.is_disconnect(
  1039. e,
  1040. self.__connection if not self.invalidated else None,
  1041. cursor)
  1042. ) or (
  1043. is_exit_exception and not self.closed
  1044. )
  1045. if context:
  1046. context.is_disconnect = self._is_disconnect
  1047. invalidate_pool_on_disconnect = not is_exit_exception
  1048. if self._reentrant_error:
  1049. util.raise_from_cause(
  1050. exc.DBAPIError.instance(statement,
  1051. parameters,
  1052. e,
  1053. self.dialect.dbapi.Error,
  1054. dialect=self.dialect),
  1055. exc_info
  1056. )
  1057. self._reentrant_error = True
  1058. try:
  1059. # non-DBAPI error - if we already got a context,
  1060. # or there's no string statement, don't wrap it
  1061. should_wrap = isinstance(e, self.dialect.dbapi.Error) or \
  1062. (statement is not None
  1063. and context is None and not is_exit_exception)
  1064. if should_wrap:
  1065. sqlalchemy_exception = exc.DBAPIError.instance(
  1066. statement,
  1067. parameters,
  1068. e,
  1069. self.dialect.dbapi.Error,
  1070. connection_invalidated=self._is_disconnect,
  1071. dialect=self.dialect)
  1072. else:
  1073. sqlalchemy_exception = None
  1074. newraise = None
  1075. if (self._has_events or self.engine._has_events) and \
  1076. not self._execution_options.get(
  1077. 'skip_user_error_events', False):
  1078. # legacy dbapi_error event
  1079. if should_wrap and context:
  1080. self.dispatch.dbapi_error(self,
  1081. cursor,
  1082. statement,
  1083. parameters,
  1084. context,
  1085. e)
  1086. # new handle_error event
  1087. ctx = ExceptionContextImpl(
  1088. e, sqlalchemy_exception, self.engine,
  1089. self, cursor, statement,
  1090. parameters, context, self._is_disconnect,
  1091. invalidate_pool_on_disconnect)
  1092. for fn in self.dispatch.handle_error:
  1093. try:
  1094. # handler returns an exception;
  1095. # call next handler in a chain
  1096. per_fn = fn(ctx)
  1097. if per_fn is not None:
  1098. ctx.chained_exception = newraise = per_fn
  1099. except Exception as _raised:
  1100. # handler raises an exception - stop processing
  1101. newraise = _raised
  1102. break
  1103. if self._is_disconnect != ctx.is_disconnect:
  1104. self._is_disconnect = ctx.is_disconnect
  1105. if sqlalchemy_exception:
  1106. sqlalchemy_exception.connection_invalidated = \
  1107. ctx.is_disconnect
  1108. # set up potentially user-defined value for
  1109. # invalidate pool.
  1110. invalidate_pool_on_disconnect = \
  1111. ctx.invalidate_pool_on_disconnect
  1112. if should_wrap and context:
  1113. context.handle_dbapi_exception(e)
  1114. if not self._is_disconnect:
  1115. if cursor:
  1116. self._safe_close_cursor(cursor)
  1117. with util.safe_reraise(warn_only=True):
  1118. self._autorollback()
  1119. if newraise:
  1120. util.raise_from_cause(newraise, exc_info)
  1121. elif should_wrap:
  1122. util.raise_from_cause(
  1123. sqlalchemy_exception,
  1124. exc_info
  1125. )
  1126. else:
  1127. util.reraise(*exc_info)
  1128. finally:
  1129. del self._reentrant_error
  1130. if self._is_disconnect:
  1131. del self._is_disconnect
  1132. if not self.invalidated:
  1133. dbapi_conn_wrapper = self.__connection
  1134. if invalidate_pool_on_disconnect:
  1135. self.engine.pool._invalidate(dbapi_conn_wrapper, e)
  1136. self.invalidate(e)
  1137. if self.should_close_with_result:
  1138. self.close()
  1139. @classmethod
  1140. def _handle_dbapi_exception_noconnection(cls, e, dialect, engine):
  1141. exc_info = sys.exc_info()
  1142. is_disconnect = dialect.is_disconnect(e, None, None)
  1143. should_wrap = isinstance(e, dialect.dbapi.Error)
  1144. if should_wrap:
  1145. sqlalchemy_exception = exc.DBAPIError.instance(
  1146. None,
  1147. None,
  1148. e,
  1149. dialect.dbapi.Error,
  1150. connection_invalidated=is_disconnect)
  1151. else:
  1152. sqlalchemy_exception = None
  1153. newraise = None
  1154. if engine._has_events:
  1155. ctx = ExceptionContextImpl(
  1156. e, sqlalchemy_exception, engine, None, None, None,
  1157. None, None, is_disconnect, True)
  1158. for fn in engine.dispatch.handle_error:
  1159. try:
  1160. # handler returns an exception;
  1161. # call next handler in a chain
  1162. per_fn = fn(ctx)
  1163. if per_fn is not None:
  1164. ctx.chained_exception = newraise = per_fn
  1165. except Exception as _raised:
  1166. # handler raises an exception - stop processing
  1167. newraise = _raised
  1168. break
  1169. if sqlalchemy_exception and \
  1170. is_disconnect != ctx.is_disconnect:
  1171. sqlalchemy_exception.connection_invalidated = \
  1172. is_disconnect = ctx.is_disconnect
  1173. if newraise:
  1174. util.raise_from_cause(newraise, exc_info)
  1175. elif should_wrap:
  1176. util.raise_from_cause(
  1177. sqlalchemy_exception,
  1178. exc_info
  1179. )
  1180. else:
  1181. util.reraise(*exc_info)
  1182. def transaction(self, callable_, *args, **kwargs):
  1183. r"""Execute the given function within a transaction boundary.
  1184. The function is passed this :class:`.Connection`
  1185. as the first argument, followed by the given \*args and \**kwargs,
  1186. e.g.::
  1187. def do_something(conn, x, y):
  1188. conn.execute("some statement", {'x':x, 'y':y})
  1189. conn.transaction(do_something, 5, 10)
  1190. The operations inside the function are all invoked within the
  1191. context of a single :class:`.Transaction`.
  1192. Upon success, the transaction is committed. If an
  1193. exception is raised, the transaction is rolled back
  1194. before propagating the exception.
  1195. .. note::
  1196. The :meth:`.transaction` method is superseded by
  1197. the usage of the Python ``with:`` statement, which can
  1198. be used with :meth:`.Connection.begin`::
  1199. with conn.begin():
  1200. conn.execute("some statement", {'x':5, 'y':10})
  1201. As well as with :meth:`.Engine.begin`::
  1202. with engine.begin() as conn:
  1203. conn.execute("some statement", {'x':5, 'y':10})
  1204. See also:
  1205. :meth:`.Engine.begin` - engine-level transactional
  1206. context
  1207. :meth:`.Engine.transaction` - engine-level version of
  1208. :meth:`.Connection.transaction`
  1209. """
  1210. trans = self.begin()
  1211. try:
  1212. ret = self.run_callable(callable_, *args, **kwargs)
  1213. trans.commit()
  1214. return ret
  1215. except:
  1216. with util.safe_reraise():
  1217. trans.rollback()
  1218. def run_callable(self, callable_, *args, **kwargs):
  1219. r"""Given a callable object or function, execute it, passing
  1220. a :class:`.Connection` as the first argument.
  1221. The given \*args and \**kwargs are passed subsequent
  1222. to the :class:`.Connection` argument.
  1223. This function, along with :meth:`.Engine.run_callable`,
  1224. allows a function to be run with a :class:`.Connection`
  1225. or :class:`.Engine` object without the need to know
  1226. which one is being dealt with.
  1227. """
  1228. return callable_(self, *args, **kwargs)
  1229. def _run_visitor(self, visitorcallable, element, **kwargs):
  1230. visitorcallable(self.dialect, self,
  1231. **kwargs).traverse_single(element)
  1232. class ExceptionContextImpl(ExceptionContext):
  1233. """Implement the :class:`.ExceptionContext` interface."""
  1234. def __init__(self, exception, sqlalchemy_exception,
  1235. engine, connection, cursor, statement, parameters,
  1236. context, is_disconnect, invalidate_pool_on_disconnect):
  1237. self.engine = engine
  1238. self.connection = connection
  1239. self.sqlalchemy_exception = sqlalchemy_exception
  1240. self.original_exception = exception
  1241. self.execution_context = context
  1242. self.statement = statement
  1243. self.parameters = parameters
  1244. self.is_disconnect = is_disconnect
  1245. self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
  1246. class Transaction(object):
  1247. """Represent a database transaction in progress.
  1248. The :class:`.Transaction` object is procured by
  1249. calling the :meth:`~.Connection.begin` method of
  1250. :class:`.Connection`::
  1251. from sqlalchemy import create_engine
  1252. engine = create_engine("postgresql://scott:tiger@localhost/test")
  1253. connection = engine.connect()
  1254. trans = connection.begin()
  1255. connection.execute("insert into x (a, b) values (1, 2)")
  1256. trans.commit()
  1257. The object provides :meth:`.rollback` and :meth:`.commit`
  1258. methods in order to control transaction boundaries. It
  1259. also implements a context manager interface so that
  1260. the Python ``with`` statement can be used with the
  1261. :meth:`.Connection.begin` method::
  1262. with connection.begin():
  1263. connection.execute("insert into x (a, b) values (1, 2)")
  1264. The Transaction object is **not** threadsafe.
  1265. See also: :meth:`.Connection.begin`, :meth:`.Connection.begin_twophase`,
  1266. :meth:`.Connection.begin_nested`.
  1267. .. index::
  1268. single: thread safety; Transaction
  1269. """
  1270. def __init__(self, connection, parent):
  1271. self.connection = connection
  1272. self._actual_parent = parent
  1273. self.is_active = True
  1274. @property
  1275. def _parent(self):
  1276. return self._actual_parent or self
  1277. def close(self):
  1278. """Close this :class:`.Transaction`.
  1279. If this transaction is the base transaction in a begin/commit
  1280. nesting, the transaction will rollback(). Otherwise, the
  1281. method returns.
  1282. This is used to cancel a Transaction without affecting the scope of
  1283. an enclosing transaction.
  1284. """
  1285. if not self._parent.is_active:
  1286. return
  1287. if self._parent is self:
  1288. self.rollback()
  1289. def rollback(self):
  1290. """Roll back this :class:`.Transaction`.
  1291. """
  1292. if not self._parent.is_active:
  1293. return
  1294. self._do_rollback()
  1295. self.is_active = False
  1296. def _do_rollback(self):
  1297. self._parent.rollback()
  1298. def commit(self):
  1299. """Commit this :class:`.Transaction`."""
  1300. if not self._parent.is_active:
  1301. raise exc.InvalidRequestError("This transaction is inactive")
  1302. self._do_commit()
  1303. self.is_active = False
  1304. def _do_commit(self):
  1305. pass
  1306. def __enter__(self):
  1307. return self
  1308. def __exit__(self, type, value, traceback):
  1309. if type is None and self.is_active:
  1310. try:
  1311. self.commit()
  1312. except:
  1313. with util.safe_reraise():
  1314. self.rollback()
  1315. else:
  1316. self.rollback()
  1317. class RootTransaction(Transaction):
  1318. def __init__(self, connection):
  1319. super(RootTransaction, self).__init__(connection, None)
  1320. self.connection._begin_impl(self)
  1321. def _do_rollback(self):
  1322. if self.is_active:
  1323. self.connection._rollback_impl()
  1324. def _do_commit(self):
  1325. if self.is_active:
  1326. self.connection._commit_impl()
  1327. class NestedTransaction(Transaction):
  1328. """Represent a 'nested', or SAVEPOINT transaction.
  1329. A new :class:`.NestedTransaction` object may be procured
  1330. using the :meth:`.Connection.begin_nested` method.
  1331. The interface is the same as that of :class:`.Transaction`.
  1332. """
  1333. def __init__(self, connection, parent):
  1334. super(NestedTransaction, self).__init__(connection, parent)
  1335. self._savepoint = self.connection._savepoint_impl()
  1336. def _do_rollback(self):
  1337. if self.is_active:
  1338. self.connection._rollback_to_savepoint_impl(
  1339. self._savepoint, self._parent)
  1340. def _do_commit(self):
  1341. if self.is_active:
  1342. self.connection._release_savepoint_impl(
  1343. self._savepoint, self._parent)
  1344. class TwoPhaseTransaction(Transaction):
  1345. """Represent a two-phase transaction.
  1346. A new :class:`.TwoPhaseTransaction` object may be procured
  1347. using the :meth:`.Connection.begin_twophase` method.
  1348. The interface is the same as that of :class:`.Transaction`
  1349. with the addition of the :meth:`prepare` method.
  1350. """
  1351. def __init__(self, connection, xid):
  1352. super(TwoPhaseTransaction, self).__init__(connection, None)
  1353. self._is_prepared = False
  1354. self.xid = xid
  1355. self.connection._begin_twophase_impl(self)
  1356. def prepare(self):
  1357. """Prepare this :class:`.TwoPhaseTransaction`.
  1358. After a PREPARE, the transaction can be committed.
  1359. """
  1360. if not self._parent.is_active:
  1361. raise exc.InvalidRequestError("This transaction is inactive")
  1362. self.connection._prepare_twophase_impl(self.xid)
  1363. self._is_prepared = True
  1364. def _do_rollback(self):
  1365. self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
  1366. def _do_commit(self):
  1367. self.connection._commit_twophase_impl(self.xid, self._is_prepared)
  1368. class Engine(Connectable, log.Identified):
  1369. """
  1370. Connects a :class:`~sqlalchemy.pool.Pool` and
  1371. :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
  1372. source of database connectivity and behavior.
  1373. An :class:`.Engine` object is instantiated publicly using the
  1374. :func:`~sqlalchemy.create_engine` function.
  1375. See also:
  1376. :doc:`/core/engines`
  1377. :ref:`connections_toplevel`
  1378. """
  1379. _execution_options = util.immutabledict()
  1380. _has_events = False
  1381. _connection_cls = Connection
  1382. schema_for_object = schema._schema_getter(None)
  1383. """Return the ".schema" attribute for an object.
  1384. Used for :class:`.Table`, :class:`.Sequence` and similar objects,
  1385. and takes into account
  1386. the :paramref:`.Connection.execution_options.schema_translate_map`
  1387. parameter.
  1388. .. versionadded:: 1.1
  1389. .. seealso::
  1390. :ref:`schema_translating`
  1391. """
  1392. def __init__(self, pool, dialect, url,
  1393. logging_name=None, echo=None, proxy=None,
  1394. execution_options=None
  1395. ):
  1396. self.pool = pool
  1397. self.url = url
  1398. self.dialect = dialect
  1399. if logging_name:
  1400. self.logging_name = logging_name
  1401. self.echo = echo
  1402. self.engine = self
  1403. log.instance_logger(self, echoflag=echo)
  1404. if proxy:
  1405. interfaces.ConnectionProxy._adapt_listener(self, proxy)
  1406. if execution_options:
  1407. self.update_execution_options(**execution_options)
  1408. def update_execution_options(self, **opt):
  1409. r"""Update the default execution_options dictionary
  1410. of this :class:`.Engine`.
  1411. The given keys/values in \**opt are added to the
  1412. default execution options that will be used for
  1413. all connections. The initial contents of this dictionary
  1414. can be sent via the ``execution_options`` parameter
  1415. to :func:`.create_engine`.
  1416. .. seealso::
  1417. :meth:`.Connection.execution_options`
  1418. :meth:`.Engine.execution_options`
  1419. """
  1420. self._execution_options = \
  1421. self._execution_options.union(opt)
  1422. self.dispatch.set_engine_execution_options(self, opt)
  1423. self.dialect.set_engine_execution_options(self, opt)
  1424. def execution_options(self, **opt):
  1425. """Return a new :class:`.Engine` that will provide
  1426. :class:`.Connection` objects with the given execution options.
  1427. The returned :class:`.Engine` remains related to the original
  1428. :class:`.Engine` in that it shares the same connection pool and
  1429. other state:
  1430. * The :class:`.Pool` used by the new :class:`.Engine` is the
  1431. same instance. The :meth:`.Engine.dispose` method will replace
  1432. the connection pool instance for the parent engine as well
  1433. as this one.
  1434. * Event listeners are "cascaded" - meaning, the new :class:`.Engine`
  1435. inherits the events of the parent, and new events can be associated
  1436. with the new :class:`.Engine` individually.
  1437. * The logging configuration and logging_name is copied from the parent
  1438. :class:`.Engine`.
  1439. The intent of the :meth:`.Engine.execution_options` method is
  1440. to implement "sharding" schemes where multiple :class:`.Engine`
  1441. objects refer to the same connection pool, but are differentiated
  1442. by options that would be consumed by a custom event::
  1443. primary_engine = create_engine("mysql://")
  1444. shard1 = primary_engine.execution_options(shard_id="shard1")
  1445. shard2 = primary_engine.execution_options(shard_id="shard2")
  1446. Above, the ``shard1`` engine serves as a factory for
  1447. :class:`.Connection` objects that will contain the execution option
  1448. ``shard_id=shard1``, and ``shard2`` will produce :class:`.Connection`
  1449. objects that contain the execution option ``shard_id=shard2``.
  1450. An event handler can consume the above execution option to perform
  1451. a schema switch or other operation, given a connection. Below
  1452. we emit a MySQL ``use`` statement to switch databases, at the same
  1453. time keeping track of which database we've established using the
  1454. :attr:`.Connection.info` dictionary, which gives us a persistent
  1455. storage space that follows the DBAPI connection::
  1456. from sqlalchemy import event
  1457. from sqlalchemy.engine import Engine
  1458. shards = {"default": "base", shard_1: "db1", "shard_2": "db2"}
  1459. @event.listens_for(Engine, "before_cursor_execute")
  1460. def _switch_shard(conn, cursor, stmt,
  1461. params, context, executemany):
  1462. shard_id = conn._execution_options.get('shard_id', "default")
  1463. current_shard = conn.info.get("current_shard", None)
  1464. if current_shard != shard_id:
  1465. cursor.execute("use %s" % shards[shard_id])
  1466. conn.info["current_shard"] = shard_id
  1467. .. versionadded:: 0.8
  1468. .. seealso::
  1469. :meth:`.Connection.execution_options` - update execution options
  1470. on a :class:`.Connection` object.
  1471. :meth:`.Engine.update_execution_options` - update the execution
  1472. options for a given :class:`.Engine` in place.
  1473. """
  1474. return OptionEngine(self, opt)
  1475. @property
  1476. def name(self):
  1477. """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
  1478. in use by this :class:`Engine`."""
  1479. return self.dialect.name
  1480. @property
  1481. def driver(self):
  1482. """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
  1483. in use by this :class:`Engine`."""
  1484. return self.dialect.driver
  1485. echo = log.echo_property()
  1486. def __repr__(self):
  1487. return 'Engine(%r)' % self.url
  1488. def dispose(self):
  1489. """Dispose of the connection pool used by this :class:`.Engine`.
  1490. This has the effect of fully closing all **currently checked in**
  1491. database connections. Connections that are still checked out
  1492. will **not** be closed, however they will no longer be associated
  1493. with this :class:`.Engine`, so when they are closed individually,
  1494. eventually the :class:`.Pool` which they are associated with will
  1495. be garbage collected and they will be closed out fully, if
  1496. not already closed on checkin.
  1497. A new connection pool is created immediately after the old one has
  1498. been disposed. This new pool, like all SQLAlchemy connection pools,
  1499. does not make any actual connections to the database until one is
  1500. first requested, so as long as the :class:`.Engine` isn't used again,
  1501. no new connections will be made.
  1502. .. seealso::
  1503. :ref:`engine_disposal`
  1504. """
  1505. self.pool.dispose()
  1506. self.pool = self.pool.recreate()
  1507. self.dispatch.engine_disposed(self)
  1508. def _execute_default(self, default):
  1509. with self.contextual_connect() as conn:
  1510. return conn._execute_default(default, (), {})
  1511. @contextlib.contextmanager
  1512. def _optional_conn_ctx_manager(self, connection=None):
  1513. if connection is None:
  1514. with self.contextual_connect() as conn:
  1515. yield conn
  1516. else:
  1517. yield connection
  1518. def _run_visitor(self, visitorcallable, element,
  1519. connection=None, **kwargs):
  1520. with self._optional_conn_ctx_manager(connection) as conn:
  1521. conn._run_visitor(visitorcallable, element, **kwargs)
  1522. class _trans_ctx(object):
  1523. def __init__(self, conn, transaction, close_with_result):
  1524. self.conn = conn
  1525. self.transaction = transaction
  1526. self.close_with_result = close_with_result
  1527. def __enter__(self):
  1528. return self.conn
  1529. def __exit__(self, type, value, traceback):
  1530. if type is not None:
  1531. self.transaction.rollback()
  1532. else:
  1533. self.transaction.commit()
  1534. if not self.close_with_result:
  1535. self.conn.close()
  1536. def begin(self, close_with_result=False):
  1537. """Return a context manager delivering a :class:`.Connection`
  1538. with a :class:`.Transaction` established.
  1539. E.g.::
  1540. with engine.begin() as conn:
  1541. conn.execute("insert into table (x, y, z) values (1, 2, 3)")
  1542. conn.execute("my_special_procedure(5)")
  1543. Upon successful operation, the :class:`.Transaction`
  1544. is committed. If an error is raised, the :class:`.Transaction`
  1545. is rolled back.
  1546. The ``close_with_result`` flag is normally ``False``, and indicates
  1547. that the :class:`.Connection` will be closed when the operation
  1548. is complete. When set to ``True``, it indicates the
  1549. :class:`.Connection` is in "single use" mode, where the
  1550. :class:`.ResultProxy` returned by the first call to
  1551. :meth:`.Connection.execute` will close the :class:`.Connection` when
  1552. that :class:`.ResultProxy` has exhausted all result rows.
  1553. .. versionadded:: 0.7.6
  1554. See also:
  1555. :meth:`.Engine.connect` - procure a :class:`.Connection` from
  1556. an :class:`.Engine`.
  1557. :meth:`.Connection.begin` - start a :class:`.Transaction`
  1558. for a particular :class:`.Connection`.
  1559. """
  1560. conn = self.contextual_connect(close_with_result=close_with_result)
  1561. try:
  1562. trans = conn.begin()
  1563. except:
  1564. with util.safe_reraise():
  1565. conn.close()
  1566. return Engine._trans_ctx(conn, trans, close_with_result)
  1567. def transaction(self, callable_, *args, **kwargs):
  1568. r"""Execute the given function within a transaction boundary.
  1569. The function is passed a :class:`.Connection` newly procured
  1570. from :meth:`.Engine.contextual_connect` as the first argument,
  1571. followed by the given \*args and \**kwargs.
  1572. e.g.::
  1573. def do_something(conn, x, y):
  1574. conn.execute("some statement", {'x':x, 'y':y})
  1575. engine.transaction(do_something, 5, 10)
  1576. The operations inside the function are all invoked within the
  1577. context of a single :class:`.Transaction`.
  1578. Upon success, the transaction is committed. If an
  1579. exception is raised, the transaction is rolled back
  1580. before propagating the exception.
  1581. .. note::
  1582. The :meth:`.transaction` method is superseded by
  1583. the usage of the Python ``with:`` statement, which can
  1584. be used with :meth:`.Engine.begin`::
  1585. with engine.begin() as conn:
  1586. conn.execute("some statement", {'x':5, 'y':10})
  1587. See also:
  1588. :meth:`.Engine.begin` - engine-level transactional
  1589. context
  1590. :meth:`.Connection.transaction` - connection-level version of
  1591. :meth:`.Engine.transaction`
  1592. """
  1593. with self.contextual_connect() as conn:
  1594. return conn.transaction(callable_, *args, **kwargs)
  1595. def run_callable(self, callable_, *args, **kwargs):
  1596. r"""Given a callable object or function, execute it, passing
  1597. a :class:`.Connection` as the first argument.
  1598. The given \*args and \**kwargs are passed subsequent
  1599. to the :class:`.Connection` argument.
  1600. This function, along with :meth:`.Connection.run_callable`,
  1601. allows a function to be run with a :class:`.Connection`
  1602. or :class:`.Engine` object without the need to know
  1603. which one is being dealt with.
  1604. """
  1605. with self.contextual_connect() as conn:
  1606. return conn.run_callable(callable_, *args, **kwargs)
  1607. def execute(self, statement, *multiparams, **params):
  1608. """Executes the given construct and returns a :class:`.ResultProxy`.
  1609. The arguments are the same as those used by
  1610. :meth:`.Connection.execute`.
  1611. Here, a :class:`.Connection` is acquired using the
  1612. :meth:`~.Engine.contextual_connect` method, and the statement executed
  1613. with that connection. The returned :class:`.ResultProxy` is flagged
  1614. such that when the :class:`.ResultProxy` is exhausted and its
  1615. underlying cursor is closed, the :class:`.Connection` created here
  1616. will also be closed, which allows its associated DBAPI connection
  1617. resource to be returned to the connection pool.
  1618. """
  1619. connection = self.contextual_connect(close_with_result=True)
  1620. return connection.execute(statement, *multiparams, **params)
  1621. def scalar(self, statement, *multiparams, **params):
  1622. return self.execute(statement, *multiparams, **params).scalar()
  1623. def _execute_clauseelement(self, elem, multiparams=None, params=None):
  1624. connection = self.contextual_connect(close_with_result=True)
  1625. return connection._execute_clauseelement(elem, multiparams, params)
  1626. def _execute_compiled(self, compiled, multiparams, params):
  1627. connection = self.contextual_connect(close_with_result=True)
  1628. return connection._execute_compiled(compiled, multiparams, params)
  1629. def connect(self, **kwargs):
  1630. """Return a new :class:`.Connection` object.
  1631. The :class:`.Connection` object is a facade that uses a DBAPI
  1632. connection internally in order to communicate with the database. This
  1633. connection is procured from the connection-holding :class:`.Pool`
  1634. referenced by this :class:`.Engine`. When the
  1635. :meth:`~.Connection.close` method of the :class:`.Connection` object
  1636. is called, the underlying DBAPI connection is then returned to the
  1637. connection pool, where it may be used again in a subsequent call to
  1638. :meth:`~.Engine.connect`.
  1639. """
  1640. return self._connection_cls(self, **kwargs)
  1641. def contextual_connect(self, close_with_result=False, **kwargs):
  1642. """Return a :class:`.Connection` object which may be part of some
  1643. ongoing context.
  1644. By default, this method does the same thing as :meth:`.Engine.connect`.
  1645. Subclasses of :class:`.Engine` may override this method
  1646. to provide contextual behavior.
  1647. :param close_with_result: When True, the first :class:`.ResultProxy`
  1648. created by the :class:`.Connection` will call the
  1649. :meth:`.Connection.close` method of that connection as soon as any
  1650. pending result rows are exhausted. This is used to supply the
  1651. "connectionless execution" behavior provided by the
  1652. :meth:`.Engine.execute` method.
  1653. """
  1654. return self._connection_cls(
  1655. self,
  1656. self._wrap_pool_connect(self.pool.connect, None),
  1657. close_with_result=close_with_result,
  1658. **kwargs)
  1659. def table_names(self, schema=None, connection=None):
  1660. """Return a list of all table names available in the database.
  1661. :param schema: Optional, retrieve names from a non-default schema.
  1662. :param connection: Optional, use a specified connection. Default is
  1663. the ``contextual_connect`` for this ``Engine``.
  1664. """
  1665. with self._optional_conn_ctx_manager(connection) as conn:
  1666. if not schema:
  1667. schema = self.dialect.default_schema_name
  1668. return self.dialect.get_table_names(conn, schema)
  1669. def has_table(self, table_name, schema=None):
  1670. """Return True if the given backend has a table of the given name.
  1671. .. seealso::
  1672. :ref:`metadata_reflection_inspector` - detailed schema inspection
  1673. using the :class:`.Inspector` interface.
  1674. :class:`.quoted_name` - used to pass quoting information along
  1675. with a schema identifier.
  1676. """
  1677. return self.run_callable(self.dialect.has_table, table_name, schema)
  1678. def _wrap_pool_connect(self, fn, connection):
  1679. dialect = self.dialect
  1680. try:
  1681. return fn()
  1682. except dialect.dbapi.Error as e:
  1683. if connection is None:
  1684. Connection._handle_dbapi_exception_noconnection(
  1685. e, dialect, self)
  1686. else:
  1687. util.reraise(*sys.exc_info())
  1688. def raw_connection(self, _connection=None):
  1689. """Return a "raw" DBAPI connection from the connection pool.
  1690. The returned object is a proxied version of the DBAPI
  1691. connection object used by the underlying driver in use.
  1692. The object will have all the same behavior as the real DBAPI
  1693. connection, except that its ``close()`` method will result in the
  1694. connection being returned to the pool, rather than being closed
  1695. for real.
  1696. This method provides direct DBAPI connection access for
  1697. special situations when the API provided by :class:`.Connection`
  1698. is not needed. When a :class:`.Connection` object is already
  1699. present, the DBAPI connection is available using
  1700. the :attr:`.Connection.connection` accessor.
  1701. .. seealso::
  1702. :ref:`dbapi_connections`
  1703. """
  1704. return self._wrap_pool_connect(
  1705. self.pool.unique_connection, _connection)
  1706. class OptionEngine(Engine):
  1707. def __init__(self, proxied, execution_options):
  1708. self._proxied = proxied
  1709. self.url = proxied.url
  1710. self.dialect = proxied.dialect
  1711. self.logging_name = proxied.logging_name
  1712. self.echo = proxied.echo
  1713. log.instance_logger(self, echoflag=self.echo)
  1714. self.dispatch = self.dispatch._join(proxied.dispatch)
  1715. self._execution_options = proxied._execution_options
  1716. self.update_execution_options(**execution_options)
  1717. def _get_pool(self):
  1718. return self._proxied.pool
  1719. def _set_pool(self, pool):
  1720. self._proxied.pool = pool
  1721. pool = property(_get_pool, _set_pool)
  1722. def _get_has_events(self):
  1723. return self._proxied._has_events or \
  1724. self.__dict__.get('_has_events', False)
  1725. def _set_has_events(self, value):
  1726. self.__dict__['_has_events'] = value
  1727. _has_events = property(_get_has_events, _set_has_events)