123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000 |
- # postgresql/base.py
- # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- r"""
- .. dialect:: postgresql
- :name: PostgreSQL
- .. _postgresql_sequences:
- Sequences/SERIAL
- ----------------
- PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
- of creating new primary key values for integer-based primary key columns. When
- creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
- integer-based primary key columns, which generates a sequence and server side
- default corresponding to the column.
- To specify a specific named sequence to be used for primary key generation,
- use the :func:`~sqlalchemy.schema.Sequence` construct::
- Table('sometable', metadata,
- Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
- )
- When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
- having the "last insert identifier" available, a RETURNING clause is added to
- the INSERT statement which specifies the primary key columns should be
- returned after the statement completes. The RETURNING functionality only takes
- place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
- sequence, whether specified explicitly or implicitly via ``SERIAL``, is
- executed independently beforehand, the returned value to be used in the
- subsequent insert. Note that when an
- :func:`~sqlalchemy.sql.expression.insert()` construct is executed using
- "executemany" semantics, the "last inserted identifier" functionality does not
- apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
- case.
- To force the usage of RETURNING by default off, specify the flag
- ``implicit_returning=False`` to :func:`.create_engine`.
- .. _postgresql_isolation_level:
- Transaction Isolation Level
- ---------------------------
- All PostgreSQL dialects support setting of transaction isolation level
- both via a dialect-specific parameter
- :paramref:`.create_engine.isolation_level` accepted by :func:`.create_engine`,
- as well as the :paramref:`.Connection.execution_options.isolation_level`
- argument as passed to :meth:`.Connection.execution_options`.
- When using a non-psycopg2 dialect, this feature works by issuing the command
- ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL <level>`` for
- each new connection. For the special AUTOCOMMIT isolation level,
- DBAPI-specific techniques are used.
- To set isolation level using :func:`.create_engine`::
- engine = create_engine(
- "postgresql+pg8000://scott:tiger@localhost/test",
- isolation_level="READ UNCOMMITTED"
- )
- To set using per-connection execution options::
- connection = engine.connect()
- connection = connection.execution_options(
- isolation_level="READ COMMITTED"
- )
- Valid values for ``isolation_level`` include:
- * ``READ COMMITTED``
- * ``READ UNCOMMITTED``
- * ``REPEATABLE READ``
- * ``SERIALIZABLE``
- * ``AUTOCOMMIT`` - on psycopg2 / pg8000 only
- .. seealso::
- :ref:`psycopg2_isolation_level`
- :ref:`pg8000_isolation_level`
- .. _postgresql_schema_reflection:
- Remote-Schema Table Introspection and PostgreSQL search_path
- ------------------------------------------------------------
- The PostgreSQL dialect can reflect tables from any schema. The
- :paramref:`.Table.schema` argument, or alternatively the
- :paramref:`.MetaData.reflect.schema` argument determines which schema will
- be searched for the table or tables. The reflected :class:`.Table` objects
- will in all cases retain this ``.schema`` attribute as was specified.
- However, with regards to tables which these :class:`.Table` objects refer to
- via foreign key constraint, a decision must be made as to how the ``.schema``
- is represented in those remote tables, in the case where that remote
- schema name is also a member of the current
- `PostgreSQL search path
- <http://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
- By default, the PostgreSQL dialect mimics the behavior encouraged by
- PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
- returns a sample definition for a particular foreign key constraint,
- omitting the referenced schema name from that definition when the name is
- also in the PostgreSQL schema search path. The interaction below
- illustrates this behavior::
- test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
- CREATE TABLE
- test=> CREATE TABLE referring(
- test(> id INTEGER PRIMARY KEY,
- test(> referred_id INTEGER REFERENCES test_schema.referred(id));
- CREATE TABLE
- test=> SET search_path TO public, test_schema;
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f'
- test-> ;
- pg_get_constraintdef
- ---------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES referred(id)
- (1 row)
- Above, we created a table ``referred`` as a member of the remote schema
- ``test_schema``, however when we added ``test_schema`` to the
- PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
- ``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
- the function.
- On the other hand, if we set the search path back to the typical default
- of ``public``::
- test=> SET search_path TO public;
- SET
- The same query against ``pg_get_constraintdef()`` now returns the fully
- schema-qualified name for us::
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f';
- pg_get_constraintdef
- ---------------------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
- (1 row)
- SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
- in order to determine the remote schema name. That is, if our ``search_path``
- were set to include ``test_schema``, and we invoked a table
- reflection process as follows::
- >>> from sqlalchemy import Table, MetaData, create_engine
- >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
- >>> with engine.connect() as conn:
- ... conn.execute("SET search_path TO test_schema, public")
- ... meta = MetaData()
- ... referring = Table('referring', meta,
- ... autoload=True, autoload_with=conn)
- ...
- <sqlalchemy.engine.result.ResultProxy object at 0x101612ed0>
- The above process would deliver to the :attr:`.MetaData.tables` collection
- ``referred`` table named **without** the schema::
- >>> meta.tables['referred'].schema is None
- True
- To alter the behavior of reflection such that the referred schema is
- maintained regardless of the ``search_path`` setting, use the
- ``postgresql_ignore_search_path`` option, which can be specified as a
- dialect-specific argument to both :class:`.Table` as well as
- :meth:`.MetaData.reflect`::
- >>> with engine.connect() as conn:
- ... conn.execute("SET search_path TO test_schema, public")
- ... meta = MetaData()
- ... referring = Table('referring', meta, autoload=True,
- ... autoload_with=conn,
- ... postgresql_ignore_search_path=True)
- ...
- <sqlalchemy.engine.result.ResultProxy object at 0x1016126d0>
- We will now have ``test_schema.referred`` stored as schema-qualified::
- >>> meta.tables['test_schema.referred'].schema
- 'test_schema'
- .. sidebar:: Best Practices for PostgreSQL Schema reflection
- The description of PostgreSQL schema reflection behavior is complex, and
- is the product of many years of dealing with widely varied use cases and
- user preferences. But in fact, there's no need to understand any of it if
- you just stick to the simplest use pattern: leave the ``search_path`` set
- to its default of ``public`` only, never refer to the name ``public`` as
- an explicit schema name otherwise, and refer to all other schema names
- explicitly when building up a :class:`.Table` object. The options
- described here are only for those users who can't, or prefer not to, stay
- within these guidelines.
- Note that **in all cases**, the "default" schema is always reflected as
- ``None``. The "default" schema on PostgreSQL is that which is returned by the
- PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
- installation, this is the name ``public``. So a table that refers to another
- which is in the ``public`` (i.e. default) schema will always have the
- ``.schema`` attribute set to ``None``.
- .. versionadded:: 0.9.2 Added the ``postgresql_ignore_search_path``
- dialect-level option accepted by :class:`.Table` and
- :meth:`.MetaData.reflect`.
- .. seealso::
- `The Schema Search Path
- <http://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
- - on the PostgreSQL website.
- INSERT/UPDATE...RETURNING
- -------------------------
- The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
- ``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
- for single-row INSERT statements in order to fetch newly generated
- primary key identifiers. To specify an explicit ``RETURNING`` clause,
- use the :meth:`._UpdateBase.returning` method on a per-statement basis::
- # INSERT..RETURNING
- result = table.insert().returning(table.c.col1, table.c.col2).\
- values(name='foo')
- print result.fetchall()
- # UPDATE..RETURNING
- result = table.update().returning(table.c.col1, table.c.col2).\
- where(table.c.name=='foo').values(name='bar')
- print result.fetchall()
- # DELETE..RETURNING
- result = table.delete().returning(table.c.col1, table.c.col2).\
- where(table.c.name=='foo')
- print result.fetchall()
- .. _postgresql_insert_on_conflict:
- INSERT...ON CONFLICT (Upsert)
- ------------------------------
- Starting with version 9.5, PostgreSQL allows "upserts" (update or insert)
- of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement.
- A candidate row will only be inserted if that row does not violate
- any unique constraints. In the case of a unique constraint violation,
- a secondary action can occur which can be either "DO UPDATE", indicating
- that the data in the target row should be updated, or "DO NOTHING",
- which indicates to silently skip this row.
- Conflicts are determined using existing unique constraints and indexes. These
- constraints may be identified either using their name as stated in DDL,
- or they may be *inferred* by stating the columns and conditions that comprise
- the indexes.
- SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
- :func:`.postgresql.dml.insert()` function, which provides
- the generative methods :meth:`~.postgresql.dml.Insert.on_conflict_do_update`
- and :meth:`~.postgresql.dml.Insert.on_conflict_do_nothing`::
- from sqlalchemy.dialects.postgresql import insert
- insert_stmt = insert(my_table).values(
- id='some_existing_id',
- data='inserted value')
- do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
- index_elements=['id']
- )
- conn.execute(do_nothing_stmt)
- do_update_stmt = insert_stmt.on_conflict_do_update(
- constraint='pk_my_table',
- set_=dict(data='updated value')
- )
- conn.execute(do_update_stmt)
- Both methods supply the "target" of the conflict using either the
- named constraint or by column inference:
- * The :paramref:`.Insert.on_conflict_do_update.index_elements` argument
- specifies a sequence containing string column names, :class:`.Column` objects,
- and/or SQL expression elements, which would identify a unique index::
- do_update_stmt = insert_stmt.on_conflict_do_update(
- index_elements=['id'],
- set_=dict(data='updated value')
- )
- do_update_stmt = insert_stmt.on_conflict_do_update(
- index_elements=[my_table.c.id],
- set_=dict(data='updated value')
- )
- * When using :paramref:`.Insert.on_conflict_do_update.index_elements` to
- infer an index, a partial index can be inferred by also specifying the
- use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
- stmt = stmt.on_conflict_do_update(
- index_elements=[my_table.c.user_email],
- index_where=my_table.c.user_email.like('%@gmail.com'),
- set_=dict(data=stmt.excluded.data)
- )
- conn.execute(stmt)
- * The :paramref:`.Insert.on_conflict_do_update.constraint` argument is
- used to specify an index directly rather than inferring it. This can be
- the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX::
- do_update_stmt = insert_stmt.on_conflict_do_update(
- constraint='my_table_idx_1',
- set_=dict(data='updated value')
- )
- do_update_stmt = insert_stmt.on_conflict_do_update(
- constraint='my_table_pk',
- set_=dict(data='updated value')
- )
- * The :paramref:`.Insert.on_conflict_do_update.constraint` argument may
- also refer to a SQLAlchemy construct representing a constraint,
- e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
- :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
- if the constraint has a name, it is used directly. Otherwise, if the
- constraint is unnamed, then inference will be used, where the expressions
- and optional WHERE clause of the constraint will be spelled out in the
- construct. This use is especially convenient
- to refer to the named or unnamed primary key of a :class:`.Table` using the
- :attr:`.Table.primary_key` attribute::
- do_update_stmt = insert_stmt.on_conflict_do_update(
- constraint=my_table.primary_key,
- set_=dict(data='updated value')
- )
- ``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
- existing row, using any combination of new values as well as values
- from the proposed insertion. These values are specified using the
- :paramref:`.Insert.on_conflict_do_update.set_` parameter. This
- parameter accepts a dictionary which consists of direct values
- for UPDATE::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(id='some_id', data='inserted value')
- do_update_stmt = stmt.on_conflict_do_update(
- index_elements=['id'],
- set_=dict(data='updated value')
- )
- conn.execute(do_update_stmt)
- .. warning::
- The :meth:`.Insert.on_conflict_do_update` method does **not** take into
- account Python-side default UPDATE values or generation functions, e.g.
- e.g. those specified using :paramref:`.Column.onupdate`.
- These values will not be exercised for an ON CONFLICT style of UPDATE,
- unless they are manually specified in the
- :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
- In order to refer to the proposed insertion row, the special alias
- :attr:`~.postgresql.dml.Insert.excluded` is available as an attribute on
- the :class:`.postgresql.dml.Insert` object; this object is a
- :class:`.ColumnCollection` which alias contains all columns of the target
- table::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(
- id='some_id',
- data='inserted value',
- author='jlh')
- do_update_stmt = stmt.on_conflict_do_update(
- index_elements=['id'],
- set_=dict(data='updated value', author=stmt.excluded.author)
- )
- conn.execute(do_update_stmt)
- The :meth:`.Insert.on_conflict_do_update` method also accepts
- a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where`
- parameter, which will limit those rows which receive an UPDATE::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(
- id='some_id',
- data='inserted value',
- author='jlh')
- on_update_stmt = stmt.on_conflict_do_update(
- index_elements=['id'],
- set_=dict(data='updated value', author=stmt.excluded.author)
- where=(my_table.c.status == 2)
- )
- conn.execute(on_update_stmt)
- ``ON CONFLICT`` may also be used to skip inserting a row entirely
- if any conflict with a unique or exclusion constraint occurs; below
- this is illustrated using the
- :meth:`~.postgresql.dml.Insert.on_conflict_do_nothing` method::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(id='some_id', data='inserted value')
- stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
- conn.execute(stmt)
- If ``DO NOTHING`` is used without specifying any columns or constraint,
- it has the effect of skipping the INSERT for any unique or exclusion
- constraint violation which occurs::
- from sqlalchemy.dialects.postgresql import insert
- stmt = insert(my_table).values(id='some_id', data='inserted value')
- stmt = stmt.on_conflict_do_nothing()
- conn.execute(stmt)
- .. versionadded:: 1.1 Added support for PostgreSQL ON CONFLICT clauses
- .. seealso::
- `INSERT .. ON CONFLICT <http://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ - in the PostgreSQL documentation.
- .. _postgresql_match:
- Full Text Search
- ----------------
- SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
- :meth:`.ColumnElement.match` method on any textual column expression.
- On a PostgreSQL dialect, an expression like the following::
- select([sometable.c.text.match("search string")])
- will emit to the database::
- SELECT text @@ to_tsquery('search string') FROM table
- The PostgreSQL text search functions such as ``to_tsquery()``
- and ``to_tsvector()`` are available
- explicitly using the standard :data:`.func` construct. For example::
- select([
- func.to_tsvector('fat cats ate rats').match('cat & rat')
- ])
- Emits the equivalent of::
- SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
- The :class:`.postgresql.TSVECTOR` type can provide for explicit CAST::
- from sqlalchemy.dialects.postgresql import TSVECTOR
- from sqlalchemy import select, cast
- select([cast("some text", TSVECTOR)])
- produces a statement equivalent to::
- SELECT CAST('some text' AS TSVECTOR) AS anon_1
- Full Text Searches in PostgreSQL are influenced by a combination of: the
- PostgresSQL setting of ``default_text_search_config``, the ``regconfig`` used
- to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
- during a query.
- When performing a Full Text Search against a column that has a GIN or
- GiST index that is already pre-computed (which is common on full text
- searches) one may need to explicitly pass in a particular PostgresSQL
- ``regconfig`` value to ensure the query-planner utilizes the index and does
- not re-compute the column on demand.
- In order to provide for this explicit query planning, or to use different
- search strategies, the ``match`` method accepts a ``postgresql_regconfig``
- keyword argument::
- select([mytable.c.id]).where(
- mytable.c.title.match('somestring', postgresql_regconfig='english')
- )
- Emits the equivalent of::
- SELECT mytable.id FROM mytable
- WHERE mytable.title @@ to_tsquery('english', 'somestring')
- One can also specifically pass in a `'regconfig'` value to the
- ``to_tsvector()`` command as the initial argument::
- select([mytable.c.id]).where(
- func.to_tsvector('english', mytable.c.title )\
- .match('somestring', postgresql_regconfig='english')
- )
- produces a statement equivalent to::
- SELECT mytable.id FROM mytable
- WHERE to_tsvector('english', mytable.title) @@
- to_tsquery('english', 'somestring')
- It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
- PostgresSQL to ensure that you are generating queries with SQLAlchemy that
- take full advantage of any indexes you may have created for full text search.
- FROM ONLY ...
- ------------------------
- The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
- table in an inheritance hierarchy. This can be used to produce the
- ``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
- syntaxes. It uses SQLAlchemy's hints mechanism::
- # SELECT ... FROM ONLY ...
- result = table.select().with_hint(table, 'ONLY', 'postgresql')
- print result.fetchall()
- # UPDATE ONLY ...
- table.update(values=dict(foo='bar')).with_hint('ONLY',
- dialect_name='postgresql')
- # DELETE FROM ONLY ...
- table.delete().with_hint('ONLY', dialect_name='postgresql')
- .. _postgresql_indexes:
- PostgreSQL-Specific Index Options
- ---------------------------------
- Several extensions to the :class:`.Index` construct are available, specific
- to the PostgreSQL dialect.
- .. _postgresql_partial_indexes:
- Partial Indexes
- ^^^^^^^^^^^^^^^^
- Partial indexes add criterion to the index definition so that the index is
- applied to a subset of rows. These can be specified on :class:`.Index`
- using the ``postgresql_where`` keyword argument::
- Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
- Operator Classes
- ^^^^^^^^^^^^^^^^^
- PostgreSQL allows the specification of an *operator class* for each column of
- an index (see
- http://www.postgresql.org/docs/8.3/interactive/indexes-opclass.html).
- The :class:`.Index` construct allows these to be specified via the
- ``postgresql_ops`` keyword argument::
- Index(
- 'my_index', my_table.c.id, my_table.c.data,
- postgresql_ops={
- 'data': 'text_pattern_ops',
- 'id': 'int4_ops'
- })
- Note that the keys in the ``postgresql_ops`` dictionary are the "key" name of
- the :class:`.Column`, i.e. the name used to access it from the ``.c``
- collection of :class:`.Table`, which can be configured to be different than
- the actual name of the column as expressed in the database.
- If ``postgresql_ops`` is to be used against a complex SQL expression such
- as a function call, then to apply to the column it must be given a label
- that is identified in the dictionary by name, e.g.::
- Index(
- 'my_index', my_table.c.id,
- func.lower(my_table.c.data).label('data_lower'),
- postgresql_ops={
- 'data_lower': 'text_pattern_ops',
- 'id': 'int4_ops'
- })
- Index Types
- ^^^^^^^^^^^^
- PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
- as the ability for users to create their own (see
- http://www.postgresql.org/docs/8.3/static/indexes-types.html). These can be
- specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
- Index('my_index', my_table.c.data, postgresql_using='gin')
- The value passed to the keyword argument will be simply passed through to the
- underlying CREATE INDEX command, so it *must* be a valid index type for your
- version of PostgreSQL.
- .. _postgresql_index_storage:
- Index Storage Parameters
- ^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL allows storage parameters to be set on indexes. The storage
- parameters available depend on the index method used by the index. Storage
- parameters can be specified on :class:`.Index` using the ``postgresql_with``
- keyword argument::
- Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
- .. versionadded:: 1.0.6
- PostgreSQL allows to define the tablespace in which to create the index.
- The tablespace can be specified on :class:`.Index` using the
- ``postgresql_tablespace`` keyword argument::
- Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
- .. versionadded:: 1.1
- Note that the same option is available on :class:`.Table` as well.
- .. _postgresql_index_concurrently:
- Indexes with CONCURRENTLY
- ^^^^^^^^^^^^^^^^^^^^^^^^^
- The PostgreSQL index option CONCURRENTLY is supported by passing the
- flag ``postgresql_concurrently`` to the :class:`.Index` construct::
- tbl = Table('testtbl', m, Column('data', Integer))
- idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
- The above index construct will render DDL for CREATE INDEX, assuming
- PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
- CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
- For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
- a connection-less dialect, it will emit::
- DROP INDEX CONCURRENTLY test_idx1
- .. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
- CONCURRENTLY keyword is now only emitted if a high enough version
- of PostgreSQL is detected on the connection (or for a connection-less
- dialect).
- When using CONCURRENTLY, the Postgresql database requires that the statement
- be invoked outside of a transaction block. The Python DBAPI enforces that
- even for a single statement, a transaction is present, so to use this
- construct, the DBAPI's "autocommit" mode must be used::
- metadata = MetaData()
- table = Table(
- "foo", metadata,
- Column("id", String))
- index = Index(
- "foo_idx", table.c.id, postgresql_concurrently=True)
- with engine.connect() as conn:
- with conn.execution_options(isolation_level='AUTOCOMMIT'):
- table.create(conn)
- .. seealso::
- :ref:`postgresql_isolation_level`
- .. _postgresql_index_reflection:
- PostgreSQL Index Reflection
- ---------------------------
- The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
- UNIQUE CONSTRAINT construct is used. When inspecting a table using
- :class:`.Inspector`, the :meth:`.Inspector.get_indexes`
- and the :meth:`.Inspector.get_unique_constraints` will report on these
- two constructs distinctly; in the case of the index, the key
- ``duplicates_constraint`` will be present in the index entry if it is
- detected as mirroring a constraint. When performing reflection using
- ``Table(..., autoload=True)``, the UNIQUE INDEX is **not** returned
- in :attr:`.Table.indexes` when it is detected as mirroring a
- :class:`.UniqueConstraint` in the :attr:`.Table.constraints` collection.
- .. versionchanged:: 1.0.0 - :class:`.Table` reflection now includes
- :class:`.UniqueConstraint` objects present in the :attr:`.Table.constraints`
- collection; the PostgreSQL backend will no longer include a "mirrored"
- :class:`.Index` construct in :attr:`.Table.indexes` if it is detected
- as corresponding to a unique constraint.
- Special Reflection Options
- --------------------------
- The :class:`.Inspector` used for the PostgreSQL backend is an instance
- of :class:`.PGInspector`, which offers additional methods::
- from sqlalchemy import create_engine, inspect
- engine = create_engine("postgresql+psycopg2://localhost/test")
- insp = inspect(engine) # will be a PGInspector
- print(insp.get_enums())
- .. autoclass:: PGInspector
- :members:
- .. _postgresql_table_options:
- PostgreSQL Table Options
- -------------------------
- Several options for CREATE TABLE are supported directly by the PostgreSQL
- dialect in conjunction with the :class:`.Table` construct:
- * ``TABLESPACE``::
- Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
- The above option is also available on the :class:`.Index` construct.
- * ``ON COMMIT``::
- Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
- * ``WITH OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=True)
- * ``WITHOUT OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=False)
- * ``INHERITS``::
- Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
- Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
- .. versionadded:: 1.0.0
- .. seealso::
- `PostgreSQL CREATE TABLE options
- <http://www.postgresql.org/docs/current/static/sql-createtable.html>`_
- ARRAY Types
- -----------
- The PostgreSQL dialect supports arrays, both as multidimensional column types
- as well as array literals:
- * :class:`.postgresql.ARRAY` - ARRAY datatype
- * :class:`.postgresql.array` - array literal
- * :func:`.postgresql.array_agg` - ARRAY_AGG SQL function
- * :class:`.postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
- function syntax.
- JSON Types
- ----------
- The PostgreSQL dialect supports both JSON and JSONB datatypes, including
- psycopg2's native support and support for all of PostgreSQL's special
- operators:
- * :class:`.postgresql.JSON`
- * :class:`.postgresql.JSONB`
- HSTORE Type
- -----------
- The PostgreSQL HSTORE type as well as hstore literals are supported:
- * :class:`.postgresql.HSTORE` - HSTORE datatype
- * :class:`.postgresql.hstore` - hstore literal
- ENUM Types
- ----------
- PostgreSQL has an independently creatable TYPE structure which is used
- to implement an enumerated type. This approach introduces significant
- complexity on the SQLAlchemy side in terms of when this type should be
- CREATED and DROPPED. The type object is also an independently reflectable
- entity. The following sections should be consulted:
- * :class:`.postgresql.ENUM` - DDL and typing support for ENUM.
- * :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
- * :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
- CREATE and DROP commands for ENUM.
- .. _postgresql_array_of_enum:
- Using ENUM with ARRAY
- ^^^^^^^^^^^^^^^^^^^^^
- The combination of ENUM and ARRAY is not directly supported by backend
- DBAPIs at this time. In order to send and receive an ARRAY of ENUM,
- use the following workaround type::
- class ArrayOfEnum(ARRAY):
- def bind_expression(self, bindvalue):
- return sa.cast(bindvalue, self)
- def result_processor(self, dialect, coltype):
- super_rp = super(ArrayOfEnum, self).result_processor(
- dialect, coltype)
- def handle_raw_string(value):
- inner = re.match(r"^{(.*)}$", value).group(1)
- return inner.split(",") if inner else []
- def process(value):
- if value is None:
- return None
- return super_rp(handle_raw_string(value))
- return process
- E.g.::
- Table(
- 'mydata', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
- )
- This type is not included as a built-in type as it would be incompatible
- with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
- a new version.
- .. _postgresql_array_of_json:
- Using JSON/JSONB with ARRAY
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Similar to using ENUM, for an ARRAY of JSON/JSONB we need to render the
- appropriate CAST, however current psycopg2 drivers seem to handle the result
- for ARRAY of JSON automatically, so the type is simpler::
- class CastingArray(ARRAY):
- def bind_expression(self, bindvalue):
- return sa.cast(bindvalue, self)
- E.g.::
- Table(
- 'mydata', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', CastingArray(JSONB))
- )
- """
- from collections import defaultdict
- import re
- import datetime as dt
- from sqlalchemy.sql import elements
- from ... import sql, schema, exc, util
- from ...engine import default, reflection
- from ...sql import compiler, expression
- from ... import types as sqltypes
- try:
- from uuid import UUID as _python_UUID
- except ImportError:
- _python_UUID = None
- from sqlalchemy.types import INTEGER, BIGINT, SMALLINT, VARCHAR, \
- CHAR, TEXT, FLOAT, NUMERIC, \
- DATE, BOOLEAN, REAL
- AUTOCOMMIT_REGEXP = re.compile(
- r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|'
- 'IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW)',
- re.I | re.UNICODE)
- RESERVED_WORDS = set(
- ["all", "analyse", "analyze", "and", "any", "array", "as", "asc",
- "asymmetric", "both", "case", "cast", "check", "collate", "column",
- "constraint", "create", "current_catalog", "current_date",
- "current_role", "current_time", "current_timestamp", "current_user",
- "default", "deferrable", "desc", "distinct", "do", "else", "end",
- "except", "false", "fetch", "for", "foreign", "from", "grant", "group",
- "having", "in", "initially", "intersect", "into", "leading", "limit",
- "localtime", "localtimestamp", "new", "not", "null", "of", "off",
- "offset", "old", "on", "only", "or", "order", "placing", "primary",
- "references", "returning", "select", "session_user", "some", "symmetric",
- "table", "then", "to", "trailing", "true", "union", "unique", "user",
- "using", "variadic", "when", "where", "window", "with", "authorization",
- "between", "binary", "cross", "current_schema", "freeze", "full",
- "ilike", "inner", "is", "isnull", "join", "left", "like", "natural",
- "notnull", "outer", "over", "overlaps", "right", "similar", "verbose"
- ])
- _DECIMAL_TYPES = (1231, 1700)
- _FLOAT_TYPES = (700, 701, 1021, 1022)
- _INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
- class BYTEA(sqltypes.LargeBinary):
- __visit_name__ = 'BYTEA'
- class DOUBLE_PRECISION(sqltypes.Float):
- __visit_name__ = 'DOUBLE_PRECISION'
- class INET(sqltypes.TypeEngine):
- __visit_name__ = "INET"
- PGInet = INET
- class CIDR(sqltypes.TypeEngine):
- __visit_name__ = "CIDR"
- PGCidr = CIDR
- class MACADDR(sqltypes.TypeEngine):
- __visit_name__ = "MACADDR"
- PGMacAddr = MACADDR
- class OID(sqltypes.TypeEngine):
- """Provide the PostgreSQL OID type.
- .. versionadded:: 0.9.5
- """
- __visit_name__ = "OID"
- class TIMESTAMP(sqltypes.TIMESTAMP):
- def __init__(self, timezone=False, precision=None):
- super(TIMESTAMP, self).__init__(timezone=timezone)
- self.precision = precision
- class TIME(sqltypes.TIME):
- def __init__(self, timezone=False, precision=None):
- super(TIME, self).__init__(timezone=timezone)
- self.precision = precision
- class INTERVAL(sqltypes.TypeEngine):
- """PostgreSQL INTERVAL type.
- The INTERVAL type may not be supported on all DBAPIs.
- It is known to work on psycopg2 and not pg8000 or zxjdbc.
- """
- __visit_name__ = 'INTERVAL'
- def __init__(self, precision=None):
- self.precision = precision
- @classmethod
- def _adapt_from_generic_interval(cls, interval):
- return INTERVAL(precision=interval.second_precision)
- @property
- def _type_affinity(self):
- return sqltypes.Interval
- @property
- def python_type(self):
- return dt.timedelta
- PGInterval = INTERVAL
- class BIT(sqltypes.TypeEngine):
- __visit_name__ = 'BIT'
- def __init__(self, length=None, varying=False):
- if not varying:
- # BIT without VARYING defaults to length 1
- self.length = length or 1
- else:
- # but BIT VARYING can be unlimited-length, so no default
- self.length = length
- self.varying = varying
- PGBit = BIT
- class UUID(sqltypes.TypeEngine):
- """PostgreSQL UUID type.
- Represents the UUID column type, interpreting
- data either as natively returned by the DBAPI
- or as Python uuid objects.
- The UUID type may not be supported on all DBAPIs.
- It is known to work on psycopg2 and not pg8000.
- """
- __visit_name__ = 'UUID'
- def __init__(self, as_uuid=False):
- """Construct a UUID type.
- :param as_uuid=False: if True, values will be interpreted
- as Python uuid objects, converting to/from string via the
- DBAPI.
- """
- if as_uuid and _python_UUID is None:
- raise NotImplementedError(
- "This version of Python does not support "
- "the native UUID type."
- )
- self.as_uuid = as_uuid
- def bind_processor(self, dialect):
- if self.as_uuid:
- def process(value):
- if value is not None:
- value = util.text_type(value)
- return value
- return process
- else:
- return None
- def result_processor(self, dialect, coltype):
- if self.as_uuid:
- def process(value):
- if value is not None:
- value = _python_UUID(value)
- return value
- return process
- else:
- return None
- PGUuid = UUID
- class TSVECTOR(sqltypes.TypeEngine):
- """The :class:`.postgresql.TSVECTOR` type implements the PostgreSQL
- text search type TSVECTOR.
- It can be used to do full text queries on natural language
- documents.
- .. versionadded:: 0.9.0
- .. seealso::
- :ref:`postgresql_match`
- """
- __visit_name__ = 'TSVECTOR'
- class ENUM(sqltypes.Enum):
- """PostgreSQL ENUM type.
- This is a subclass of :class:`.types.Enum` which includes
- support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
- When the builtin type :class:`.types.Enum` is used and the
- :paramref:`.Enum.native_enum` flag is left at its default of
- True, the PostgreSQL backend will use a :class:`.postgresql.ENUM`
- type as the implementation, so the special create/drop rules
- will be used.
- The create/drop behavior of ENUM is necessarily intricate, due to the
- awkward relationship the ENUM type has in relationship to the
- parent table, in that it may be "owned" by just a single table, or
- may be shared among many tables.
- When using :class:`.types.Enum` or :class:`.postgresql.ENUM`
- in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
- corresponding to when the :meth:`.Table.create` and :meth:`.Table.drop`
- methods are called::
- table = Table('sometable', metadata,
- Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
- )
- table.create(engine) # will emit CREATE ENUM and CREATE TABLE
- table.drop(engine) # will emit DROP TABLE and DROP ENUM
- To use a common enumerated type between multiple tables, the best
- practice is to declare the :class:`.types.Enum` or
- :class:`.postgresql.ENUM` independently, and associate it with the
- :class:`.MetaData` object itself::
- my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
- t1 = Table('sometable_one', metadata,
- Column('some_enum', myenum)
- )
- t2 = Table('sometable_two', metadata,
- Column('some_enum', myenum)
- )
- When this pattern is used, care must still be taken at the level
- of individual table creates. Emitting CREATE TABLE without also
- specifying ``checkfirst=True`` will still cause issues::
- t1.create(engine) # will fail: no such type 'myenum'
- If we specify ``checkfirst=True``, the individual table-level create
- operation will check for the ``ENUM`` and create if not exists::
- # will check if enum exists, and emit CREATE TYPE if not
- t1.create(engine, checkfirst=True)
- When using a metadata-level ENUM type, the type will always be created
- and dropped if either the metadata-wide create/drop is called::
- metadata.create_all(engine) # will emit CREATE TYPE
- metadata.drop_all(engine) # will emit DROP TYPE
- The type can also be created and dropped directly::
- my_enum.create(engine)
- my_enum.drop(engine)
- .. versionchanged:: 1.0.0 The PostgreSQL :class:`.postgresql.ENUM` type
- now behaves more strictly with regards to CREATE/DROP. A metadata-level
- ENUM type will only be created and dropped at the metadata level,
- not the table level, with the exception of
- ``table.create(checkfirst=True)``.
- The ``table.drop()`` call will now emit a DROP TYPE for a table-level
- enumerated type.
- """
- def __init__(self, *enums, **kw):
- """Construct an :class:`~.postgresql.ENUM`.
- Arguments are the same as that of
- :class:`.types.Enum`, but also including
- the following parameters.
- :param create_type: Defaults to True.
- Indicates that ``CREATE TYPE`` should be
- emitted, after optionally checking for the
- presence of the type, when the parent
- table is being created; and additionally
- that ``DROP TYPE`` is called when the table
- is dropped. When ``False``, no check
- will be performed and no ``CREATE TYPE``
- or ``DROP TYPE`` is emitted, unless
- :meth:`~.postgresql.ENUM.create`
- or :meth:`~.postgresql.ENUM.drop`
- are called directly.
- Setting to ``False`` is helpful
- when invoking a creation scheme to a SQL file
- without access to the actual database -
- the :meth:`~.postgresql.ENUM.create` and
- :meth:`~.postgresql.ENUM.drop` methods can
- be used to emit SQL to a target bind.
- .. versionadded:: 0.7.4
- """
- self.create_type = kw.pop("create_type", True)
- super(ENUM, self).__init__(*enums, **kw)
- def create(self, bind=None, checkfirst=True):
- """Emit ``CREATE TYPE`` for this
- :class:`~.postgresql.ENUM`.
- If the underlying dialect does not support
- PostgreSQL CREATE TYPE, no action is taken.
- :param bind: a connectable :class:`.Engine`,
- :class:`.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type does not exist already before
- creating.
- """
- if not bind.dialect.supports_native_enum:
- return
- if not checkfirst or \
- not bind.dialect.has_type(
- bind, self.name, schema=self.schema):
- bind.execute(CreateEnumType(self))
- def drop(self, bind=None, checkfirst=True):
- """Emit ``DROP TYPE`` for this
- :class:`~.postgresql.ENUM`.
- If the underlying dialect does not support
- PostgreSQL DROP TYPE, no action is taken.
- :param bind: a connectable :class:`.Engine`,
- :class:`.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type actually exists before dropping.
- """
- if not bind.dialect.supports_native_enum:
- return
- if not checkfirst or \
- bind.dialect.has_type(bind, self.name, schema=self.schema):
- bind.execute(DropEnumType(self))
- def _check_for_name_in_memos(self, checkfirst, kw):
- """Look in the 'ddl runner' for 'memos', then
- note our name in that collection.
- This to ensure a particular named enum is operated
- upon only once within any kind of create/drop
- sequence without relying upon "checkfirst".
- """
- if not self.create_type:
- return True
- if '_ddl_runner' in kw:
- ddl_runner = kw['_ddl_runner']
- if '_pg_enums' in ddl_runner.memo:
- pg_enums = ddl_runner.memo['_pg_enums']
- else:
- pg_enums = ddl_runner.memo['_pg_enums'] = set()
- present = self.name in pg_enums
- pg_enums.add(self.name)
- return present
- else:
- return False
- def _on_table_create(self, target, bind, checkfirst=False, **kw):
- if checkfirst or (
- not self.metadata and
- not kw.get('_is_metadata_operation', False)) and \
- not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
- def _on_table_drop(self, target, bind, checkfirst=False, **kw):
- if not self.metadata and \
- not kw.get('_is_metadata_operation', False) and \
- not self._check_for_name_in_memos(checkfirst, kw):
- self.drop(bind=bind, checkfirst=checkfirst)
- def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
- def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.drop(bind=bind, checkfirst=checkfirst)
- colspecs = {
- sqltypes.Interval: INTERVAL,
- sqltypes.Enum: ENUM,
- }
- ischema_names = {
- 'integer': INTEGER,
- 'bigint': BIGINT,
- 'smallint': SMALLINT,
- 'character varying': VARCHAR,
- 'character': CHAR,
- '"char"': sqltypes.String,
- 'name': sqltypes.String,
- 'text': TEXT,
- 'numeric': NUMERIC,
- 'float': FLOAT,
- 'real': REAL,
- 'inet': INET,
- 'cidr': CIDR,
- 'uuid': UUID,
- 'bit': BIT,
- 'bit varying': BIT,
- 'macaddr': MACADDR,
- 'oid': OID,
- 'double precision': DOUBLE_PRECISION,
- 'timestamp': TIMESTAMP,
- 'timestamp with time zone': TIMESTAMP,
- 'timestamp without time zone': TIMESTAMP,
- 'time with time zone': TIME,
- 'time without time zone': TIME,
- 'date': DATE,
- 'time': TIME,
- 'bytea': BYTEA,
- 'boolean': BOOLEAN,
- 'interval': INTERVAL,
- 'interval year to month': INTERVAL,
- 'interval day to second': INTERVAL,
- 'tsvector': TSVECTOR
- }
- class PGCompiler(compiler.SQLCompiler):
- def visit_array(self, element, **kw):
- return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
- def visit_slice(self, element, **kw):
- return "%s:%s" % (
- self.process(element.start, **kw),
- self.process(element.stop, **kw),
- )
- def visit_json_getitem_op_binary(self, binary, operator, **kw):
- kw['eager_grouping'] = True
- return self._generate_generic_binary(
- binary, " -> ", **kw
- )
- def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
- kw['eager_grouping'] = True
- return self._generate_generic_binary(
- binary, " #> ", **kw
- )
- def visit_getitem_binary(self, binary, operator, **kw):
- return "%s[%s]" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw)
- )
- def visit_aggregate_order_by(self, element, **kw):
- return "%s ORDER BY %s" % (
- self.process(element.target, **kw),
- self.process(element.order_by, **kw)
- )
- def visit_match_op_binary(self, binary, operator, **kw):
- if "postgresql_regconfig" in binary.modifiers:
- regconfig = self.render_literal_value(
- binary.modifiers['postgresql_regconfig'],
- sqltypes.STRINGTYPE)
- if regconfig:
- return "%s @@ to_tsquery(%s, %s)" % (
- self.process(binary.left, **kw),
- regconfig,
- self.process(binary.right, **kw)
- )
- return "%s @@ to_tsquery(%s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw)
- )
- def visit_ilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return '%s ILIKE %s' % \
- (self.process(binary.left, **kw),
- self.process(binary.right, **kw)) \
- + (
- ' ESCAPE ' +
- self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape else ''
- )
- def visit_notilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return '%s NOT ILIKE %s' % \
- (self.process(binary.left, **kw),
- self.process(binary.right, **kw)) \
- + (
- ' ESCAPE ' +
- self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape else ''
- )
- def render_literal_value(self, value, type_):
- value = super(PGCompiler, self).render_literal_value(value, type_)
- if self.dialect._backslash_escapes:
- value = value.replace('\\', '\\\\')
- return value
- def visit_sequence(self, seq):
- return "nextval('%s')" % self.preparer.format_sequence(seq)
- def limit_clause(self, select, **kw):
- text = ""
- if select._limit_clause is not None:
- text += " \n LIMIT " + self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- if select._limit_clause is None:
- text += " \n LIMIT ALL"
- text += " OFFSET " + self.process(select._offset_clause, **kw)
- return text
- def format_from_hint_text(self, sqltext, table, hint, iscrud):
- if hint.upper() != 'ONLY':
- raise exc.CompileError("Unrecognized hint: %r" % hint)
- return "ONLY " + sqltext
- def get_select_precolumns(self, select, **kw):
- if select._distinct is not False:
- if select._distinct is True:
- return "DISTINCT "
- elif isinstance(select._distinct, (list, tuple)):
- return "DISTINCT ON (" + ', '.join(
- [self.process(col) for col in select._distinct]
- ) + ") "
- else:
- return "DISTINCT ON (" + \
- self.process(select._distinct, **kw) + ") "
- else:
- return ""
- def for_update_clause(self, select, **kw):
- if select._for_update_arg.read:
- if select._for_update_arg.key_share:
- tmp = " FOR KEY SHARE"
- else:
- tmp = " FOR SHARE"
- elif select._for_update_arg.key_share:
- tmp = " FOR NO KEY UPDATE"
- else:
- tmp = " FOR UPDATE"
- if select._for_update_arg.of:
- tables = util.OrderedSet(
- c.table if isinstance(c, expression.ColumnClause)
- else c for c in select._for_update_arg.of)
- tmp += " OF " + ", ".join(
- self.process(table, ashint=True, use_schema=False, **kw)
- for table in tables
- )
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
- return tmp
- def returning_clause(self, stmt, returning_cols):
- columns = [
- self._label_select_column(None, c, True, False, {})
- for c in expression._select_iterables(returning_cols)
- ]
- return 'RETURNING ' + ', '.join(columns)
- def visit_substring_func(self, func, **kw):
- s = self.process(func.clauses.clauses[0], **kw)
- start = self.process(func.clauses.clauses[1], **kw)
- if len(func.clauses.clauses) > 2:
- length = self.process(func.clauses.clauses[2], **kw)
- return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
- else:
- return "SUBSTRING(%s FROM %s)" % (s, start)
- def _on_conflict_target(self, clause, **kw):
- if clause.constraint_target is not None:
- target_text = 'ON CONSTRAINT %s' % clause.constraint_target
- elif clause.inferred_target_elements is not None:
- target_text = '(%s)' % ', '.join(
- (self.preparer.quote(c)
- if isinstance(c, util.string_types)
- else
- self.process(c, include_table=False, use_schema=False))
- for c in clause.inferred_target_elements
- )
- if clause.inferred_target_whereclause is not None:
- target_text += ' WHERE %s' % \
- self.process(
- clause.inferred_target_whereclause,
- include_table=False,
- use_schema=False
- )
- else:
- target_text = ''
- return target_text
- def visit_on_conflict_do_nothing(self, on_conflict, **kw):
- target_text = self._on_conflict_target(on_conflict, **kw)
- if target_text:
- return "ON CONFLICT %s DO NOTHING" % target_text
- else:
- return "ON CONFLICT DO NOTHING"
- def visit_on_conflict_do_update(self, on_conflict, **kw):
- clause = on_conflict
- target_text = self._on_conflict_target(on_conflict, **kw)
- action_set_ops = []
- set_parameters = dict(clause.update_values_to_set)
- # create a list of column assignment clauses as tuples
- cols = self.statement.table.c
- for c in cols:
- col_key = c.key
- if col_key in set_parameters:
- value = set_parameters.pop(col_key)
- if elements._is_literal(value):
- value = elements.BindParameter(
- None, value, type_=c.type
- )
- else:
- if isinstance(value, elements.BindParameter) and \
- value.type._isnull:
- value = value._clone()
- value.type = c.type
- value_text = self.process(value.self_group(), use_schema=False)
- key_text = (
- self.preparer.quote(col_key)
- )
- action_set_ops.append('%s = %s' % (key_text, value_text))
- # check for names that don't match columns
- if set_parameters:
- util.warn(
- "Additional column names not matching "
- "any column keys in table '%s': %s" % (
- self.statement.table.name,
- (", ".join("'%s'" % c for c in set_parameters))
- )
- )
- for k, v in set_parameters.items():
- key_text = (
- self.preparer.quote(k)
- if isinstance(k, util.string_types)
- else self.process(k, use_schema=False)
- )
- value_text = self.process(
- elements._literal_as_binds(v),
- use_schema=False
- )
- action_set_ops.append('%s = %s' % (key_text, value_text))
- action_text = ', '.join(action_set_ops)
- if clause.update_whereclause is not None:
- action_text += ' WHERE %s' % \
- self.process(
- clause.update_whereclause,
- include_table=True,
- use_schema=False
- )
- return 'ON CONFLICT %s DO UPDATE SET %s' % (target_text, action_text)
- class PGDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
- impl_type = column.type.dialect_impl(self.dialect)
- if isinstance(impl_type, sqltypes.TypeDecorator):
- impl_type = impl_type.impl
- if column.primary_key and \
- column is column.table._autoincrement_column and \
- (
- self.dialect.supports_smallserial or
- not isinstance(impl_type, sqltypes.SmallInteger)
- ) and (
- column.default is None or
- (
- isinstance(column.default, schema.Sequence) and
- column.default.optional
- )):
- if isinstance(impl_type, sqltypes.BigInteger):
- colspec += " BIGSERIAL"
- elif isinstance(impl_type, sqltypes.SmallInteger):
- colspec += " SMALLSERIAL"
- else:
- colspec += " SERIAL"
- else:
- colspec += " " + self.dialect.type_compiler.process(
- column.type, type_expression=column)
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
- if not column.nullable:
- colspec += " NOT NULL"
- return colspec
- def visit_create_enum_type(self, create):
- type_ = create.element
- return "CREATE TYPE %s AS ENUM (%s)" % (
- self.preparer.format_type(type_),
- ", ".join(
- self.sql_compiler.process(sql.literal(e), literal_binds=True)
- for e in type_.enums)
- )
- def visit_drop_enum_type(self, drop):
- type_ = drop.element
- return "DROP TYPE %s" % (
- self.preparer.format_type(type_)
- )
- def visit_create_index(self, create):
- preparer = self.preparer
- index = create.element
- self._verify_index_table(index)
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- text += "INDEX "
- if self.dialect._supports_create_index_concurrently:
- concurrently = index.dialect_options['postgresql']['concurrently']
- if concurrently:
- text += "CONCURRENTLY "
- text += "%s ON %s " % (
- self._prepared_index_name(index,
- include_schema=False),
- preparer.format_table(index.table)
- )
- using = index.dialect_options['postgresql']['using']
- if using:
- text += "USING %s " % preparer.quote(using)
- ops = index.dialect_options["postgresql"]["ops"]
- text += "(%s)" \
- % (
- ', '.join([
- self.sql_compiler.process(
- expr.self_group()
- if not isinstance(expr, expression.ColumnClause)
- else expr,
- include_table=False, literal_binds=True) +
- (
- (' ' + ops[expr.key])
- if hasattr(expr, 'key')
- and expr.key in ops else ''
- )
- for expr in index.expressions
- ])
- )
- withclause = index.dialect_options['postgresql']['with']
- if withclause:
- text += " WITH (%s)" % (', '.join(
- ['%s = %s' % storage_parameter
- for storage_parameter in withclause.items()]))
- tablespace_name = index.dialect_options['postgresql']['tablespace']
- if tablespace_name:
- text += " TABLESPACE %s" % preparer.quote(tablespace_name)
- whereclause = index.dialect_options["postgresql"]["where"]
- if whereclause is not None:
- where_compiled = self.sql_compiler.process(
- whereclause, include_table=False,
- literal_binds=True)
- text += " WHERE " + where_compiled
- return text
- def visit_drop_index(self, drop):
- index = drop.element
- text = "\nDROP INDEX "
- if self.dialect._supports_drop_index_concurrently:
- concurrently = index.dialect_options['postgresql']['concurrently']
- if concurrently:
- text += "CONCURRENTLY "
- text += self._prepared_index_name(index, include_schema=True)
- return text
- def visit_exclude_constraint(self, constraint, **kw):
- text = ""
- if constraint.name is not None:
- text += "CONSTRAINT %s " % \
- self.preparer.format_constraint(constraint)
- elements = []
- for expr, name, op in constraint._render_exprs:
- kw['include_table'] = False
- elements.append(
- "%s WITH %s" % (self.sql_compiler.process(expr, **kw), op)
- )
- text += "EXCLUDE USING %s (%s)" % (constraint.using,
- ', '.join(elements))
- if constraint.where is not None:
- text += ' WHERE (%s)' % self.sql_compiler.process(
- constraint.where,
- literal_binds=True)
- text += self.define_constraint_deferrability(constraint)
- return text
- def post_create_table(self, table):
- table_opts = []
- pg_opts = table.dialect_options['postgresql']
- inherits = pg_opts.get('inherits')
- if inherits is not None:
- if not isinstance(inherits, (list, tuple)):
- inherits = (inherits, )
- table_opts.append(
- '\n INHERITS ( ' +
- ', '.join(self.preparer.quote(name) for name in inherits) +
- ' )')
- if pg_opts['with_oids'] is True:
- table_opts.append('\n WITH OIDS')
- elif pg_opts['with_oids'] is False:
- table_opts.append('\n WITHOUT OIDS')
- if pg_opts['on_commit']:
- on_commit_options = pg_opts['on_commit'].replace("_", " ").upper()
- table_opts.append('\n ON COMMIT %s' % on_commit_options)
- if pg_opts['tablespace']:
- tablespace_name = pg_opts['tablespace']
- table_opts.append(
- '\n TABLESPACE %s' % self.preparer.quote(tablespace_name)
- )
- return ''.join(table_opts)
- class PGTypeCompiler(compiler.GenericTypeCompiler):
- def visit_TSVECTOR(self, type, **kw):
- return "TSVECTOR"
- def visit_INET(self, type_, **kw):
- return "INET"
- def visit_CIDR(self, type_, **kw):
- return "CIDR"
- def visit_MACADDR(self, type_, **kw):
- return "MACADDR"
- def visit_OID(self, type_, **kw):
- return "OID"
- def visit_FLOAT(self, type_, **kw):
- if not type_.precision:
- return "FLOAT"
- else:
- return "FLOAT(%(precision)s)" % {'precision': type_.precision}
- def visit_DOUBLE_PRECISION(self, type_, **kw):
- return "DOUBLE PRECISION"
- def visit_BIGINT(self, type_, **kw):
- return "BIGINT"
- def visit_HSTORE(self, type_, **kw):
- return "HSTORE"
- def visit_JSON(self, type_, **kw):
- return "JSON"
- def visit_JSONB(self, type_, **kw):
- return "JSONB"
- def visit_INT4RANGE(self, type_, **kw):
- return "INT4RANGE"
- def visit_INT8RANGE(self, type_, **kw):
- return "INT8RANGE"
- def visit_NUMRANGE(self, type_, **kw):
- return "NUMRANGE"
- def visit_DATERANGE(self, type_, **kw):
- return "DATERANGE"
- def visit_TSRANGE(self, type_, **kw):
- return "TSRANGE"
- def visit_TSTZRANGE(self, type_, **kw):
- return "TSTZRANGE"
- def visit_datetime(self, type_, **kw):
- return self.visit_TIMESTAMP(type_, **kw)
- def visit_enum(self, type_, **kw):
- if not type_.native_enum or not self.dialect.supports_native_enum:
- return super(PGTypeCompiler, self).visit_enum(type_, **kw)
- else:
- return self.visit_ENUM(type_, **kw)
- def visit_ENUM(self, type_, **kw):
- return self.dialect.identifier_preparer.format_type(type_)
- def visit_TIMESTAMP(self, type_, **kw):
- return "TIMESTAMP%s %s" % (
- "(%d)" % type_.precision
- if getattr(type_, 'precision', None) is not None else "",
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
- )
- def visit_TIME(self, type_, **kw):
- return "TIME%s %s" % (
- "(%d)" % type_.precision
- if getattr(type_, 'precision', None) is not None else "",
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
- )
- def visit_INTERVAL(self, type_, **kw):
- if type_.precision is not None:
- return "INTERVAL(%d)" % type_.precision
- else:
- return "INTERVAL"
- def visit_BIT(self, type_, **kw):
- if type_.varying:
- compiled = "BIT VARYING"
- if type_.length is not None:
- compiled += "(%d)" % type_.length
- else:
- compiled = "BIT(%d)" % type_.length
- return compiled
- def visit_UUID(self, type_, **kw):
- return "UUID"
- def visit_large_binary(self, type_, **kw):
- return self.visit_BYTEA(type_, **kw)
- def visit_BYTEA(self, type_, **kw):
- return "BYTEA"
- def visit_ARRAY(self, type_, **kw):
- return self.process(type_.item_type) + ('[]' * (type_.dimensions
- if type_.dimensions
- is not None else 1))
- class PGIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
- def _unquote_identifier(self, value):
- if value[0] == self.initial_quote:
- value = value[1:-1].\
- replace(self.escape_to_quote, self.escape_quote)
- return value
- def format_type(self, type_, use_schema=True):
- if not type_.name:
- raise exc.CompileError("PostgreSQL ENUM type requires a name.")
- name = self.quote(type_.name)
- effective_schema = self.schema_for_object(type_)
- if not self.omit_schema and use_schema and \
- effective_schema is not None:
- name = self.quote_schema(effective_schema) + "." + name
- return name
- class PGInspector(reflection.Inspector):
- def __init__(self, conn):
- reflection.Inspector.__init__(self, conn)
- def get_table_oid(self, table_name, schema=None):
- """Return the OID for the given table name."""
- return self.dialect.get_table_oid(self.bind, table_name, schema,
- info_cache=self.info_cache)
- def get_enums(self, schema=None):
- """Return a list of ENUM objects.
- Each member is a dictionary containing these fields:
- * name - name of the enum
- * schema - the schema name for the enum.
- * visible - boolean, whether or not this enum is visible
- in the default search path.
- * labels - a list of string labels that apply to the enum.
- :param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to '*' to
- indicate load enums for all schemas.
- .. versionadded:: 1.0.0
- """
- schema = schema or self.default_schema_name
- return self.dialect._load_enums(self.bind, schema)
- def get_foreign_table_names(self, schema=None):
- """Return a list of FOREIGN TABLE names.
- Behavior is similar to that of :meth:`.Inspector.get_table_names`,
- except that the list is limited to those tables tha report a
- ``relkind`` value of ``f``.
- .. versionadded:: 1.0.0
- """
- schema = schema or self.default_schema_name
- return self.dialect._get_foreign_table_names(self.bind, schema)
- def get_view_names(self, schema=None, include=('plain', 'materialized')):
- """Return all view names in `schema`.
- :param schema: Optional, retrieve names from a non-default schema.
- For special quoting, use :class:`.quoted_name`.
- :param include: specify which types of views to return. Passed
- as a string value (for a single type) or a tuple (for any number
- of types). Defaults to ``('plain', 'materialized')``.
- .. versionadded:: 1.1
- """
- return self.dialect.get_view_names(self.bind, schema,
- info_cache=self.info_cache,
- include=include)
- class CreateEnumType(schema._CreateDropBase):
- __visit_name__ = "create_enum_type"
- class DropEnumType(schema._CreateDropBase):
- __visit_name__ = "drop_enum_type"
- class PGExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar((
- "select nextval('%s')" %
- self.dialect.identifier_preparer.format_sequence(seq)), type_)
- def get_insert_default(self, column):
- if column.primary_key and \
- column is column.table._autoincrement_column:
- if column.server_default and column.server_default.has_argument:
- # pre-execute passive defaults on primary key columns
- return self._execute_scalar("select %s" %
- column.server_default.arg,
- column.type)
- elif (column.default is None or
- (column.default.is_sequence and
- column.default.optional)):
- # execute the sequence associated with a SERIAL primary
- # key column. for non-primary-key SERIAL, the ID just
- # generates server side.
- try:
- seq_name = column._postgresql_seq_name
- except AttributeError:
- tab = column.table.name
- col = column.name
- tab = tab[0:29 + max(0, (29 - len(col)))]
- col = col[0:29 + max(0, (29 - len(tab)))]
- name = "%s_%s_seq" % (tab, col)
- column._postgresql_seq_name = seq_name = name
- if column.table is not None:
- effective_schema = self.connection.schema_for_object(
- column.table)
- else:
- effective_schema = None
- if effective_schema is not None:
- exc = "select nextval('\"%s\".\"%s\"')" % \
- (effective_schema, seq_name)
- else:
- exc = "select nextval('\"%s\"')" % \
- (seq_name, )
- return self._execute_scalar(exc, column.type)
- return super(PGExecutionContext, self).get_insert_default(column)
- def should_autocommit_text(self, statement):
- return AUTOCOMMIT_REGEXP.match(statement)
- class PGDialect(default.DefaultDialect):
- name = 'postgresql'
- supports_alter = True
- max_identifier_length = 63
- supports_sane_rowcount = True
- supports_native_enum = True
- supports_native_boolean = True
- supports_smallserial = True
- supports_sequences = True
- sequences_optional = True
- preexecute_autoincrement_sequences = True
- postfetch_lastrowid = False
- supports_default_values = True
- supports_empty_insert = False
- supports_multivalues_insert = True
- default_paramstyle = 'pyformat'
- ischema_names = ischema_names
- colspecs = colspecs
- statement_compiler = PGCompiler
- ddl_compiler = PGDDLCompiler
- type_compiler = PGTypeCompiler
- preparer = PGIdentifierPreparer
- execution_ctx_cls = PGExecutionContext
- inspector = PGInspector
- isolation_level = None
- construct_arguments = [
- (schema.Index, {
- "using": False,
- "where": None,
- "ops": {},
- "concurrently": False,
- "with": {},
- "tablespace": None
- }),
- (schema.Table, {
- "ignore_search_path": False,
- "tablespace": None,
- "with_oids": None,
- "on_commit": None,
- "inherits": None
- }),
- ]
- reflection_options = ('postgresql_ignore_search_path', )
- _backslash_escapes = True
- _supports_create_index_concurrently = True
- _supports_drop_index_concurrently = True
- def __init__(self, isolation_level=None, json_serializer=None,
- json_deserializer=None, **kwargs):
- default.DefaultDialect.__init__(self, **kwargs)
- self.isolation_level = isolation_level
- self._json_deserializer = json_deserializer
- self._json_serializer = json_serializer
- def initialize(self, connection):
- super(PGDialect, self).initialize(connection)
- self.implicit_returning = self.server_version_info > (8, 2) and \
- self.__dict__.get('implicit_returning', True)
- self.supports_native_enum = self.server_version_info >= (8, 3)
- if not self.supports_native_enum:
- self.colspecs = self.colspecs.copy()
- # pop base Enum type
- self.colspecs.pop(sqltypes.Enum, None)
- # psycopg2, others may have placed ENUM here as well
- self.colspecs.pop(ENUM, None)
- # http://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
- self.supports_smallserial = self.server_version_info >= (9, 2)
- self._backslash_escapes = self.server_version_info < (8, 2) or \
- connection.scalar(
- "show standard_conforming_strings"
- ) == 'off'
- self._supports_create_index_concurrently = \
- self.server_version_info >= (8, 2)
- self._supports_drop_index_concurrently = \
- self.server_version_info >= (9, 2)
- def on_connect(self):
- if self.isolation_level is not None:
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
- return connect
- else:
- return None
- _isolation_lookup = set(['SERIALIZABLE', 'READ UNCOMMITTED',
- 'READ COMMITTED', 'REPEATABLE READ'])
- def set_isolation_level(self, connection, level):
- level = level.replace('_', ' ')
- if level not in self._isolation_lookup:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s" %
- (level, self.name, ", ".join(self._isolation_lookup))
- )
- cursor = connection.cursor()
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION "
- "ISOLATION LEVEL %s" % level)
- cursor.execute("COMMIT")
- cursor.close()
- def get_isolation_level(self, connection):
- cursor = connection.cursor()
- cursor.execute('show transaction isolation level')
- val = cursor.fetchone()[0]
- cursor.close()
- return val.upper()
- def do_begin_twophase(self, connection, xid):
- self.do_begin(connection.connection)
- def do_prepare_twophase(self, connection, xid):
- connection.execute("PREPARE TRANSACTION '%s'" % xid)
- def do_rollback_twophase(self, connection, xid,
- is_prepared=True, recover=False):
- if is_prepared:
- if recover:
- # FIXME: ugly hack to get out of transaction
- # context when committing recoverable transactions
- # Must find out a way how to make the dbapi not
- # open a transaction.
- connection.execute("ROLLBACK")
- connection.execute("ROLLBACK PREPARED '%s'" % xid)
- connection.execute("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_rollback(connection.connection)
- def do_commit_twophase(self, connection, xid,
- is_prepared=True, recover=False):
- if is_prepared:
- if recover:
- connection.execute("ROLLBACK")
- connection.execute("COMMIT PREPARED '%s'" % xid)
- connection.execute("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_commit(connection.connection)
- def do_recover_twophase(self, connection):
- resultset = connection.execute(
- sql.text("SELECT gid FROM pg_prepared_xacts"))
- return [row[0] for row in resultset]
- def _get_default_schema_name(self, connection):
- return connection.scalar("select current_schema()")
- def has_schema(self, connection, schema):
- query = ("select nspname from pg_namespace "
- "where lower(nspname)=:schema")
- cursor = connection.execute(
- sql.text(
- query,
- bindparams=[
- sql.bindparam(
- 'schema', util.text_type(schema.lower()),
- type_=sqltypes.Unicode)]
- )
- )
- return bool(cursor.first())
- def has_table(self, connection, table_name, schema=None):
- # seems like case gets folded in pg_class...
- if schema is None:
- cursor = connection.execute(
- sql.text(
- "select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where "
- "pg_catalog.pg_table_is_visible(c.oid) "
- "and relname=:name",
- bindparams=[
- sql.bindparam('name', util.text_type(table_name),
- type_=sqltypes.Unicode)]
- )
- )
- else:
- cursor = connection.execute(
- sql.text(
- "select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where n.nspname=:schema and "
- "relname=:name",
- bindparams=[
- sql.bindparam('name',
- util.text_type(table_name),
- type_=sqltypes.Unicode),
- sql.bindparam('schema',
- util.text_type(schema),
- type_=sqltypes.Unicode)]
- )
- )
- return bool(cursor.first())
- def has_sequence(self, connection, sequence_name, schema=None):
- if schema is None:
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=current_schema() "
- "and relname=:name",
- bindparams=[
- sql.bindparam('name', util.text_type(sequence_name),
- type_=sqltypes.Unicode)
- ]
- )
- )
- else:
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=:schema and relname=:name",
- bindparams=[
- sql.bindparam('name', util.text_type(sequence_name),
- type_=sqltypes.Unicode),
- sql.bindparam('schema',
- util.text_type(schema),
- type_=sqltypes.Unicode)
- ]
- )
- )
- return bool(cursor.first())
- def has_type(self, connection, type_name, schema=None):
- if schema is not None:
- query = """
- SELECT EXISTS (
- SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
- WHERE t.typnamespace = n.oid
- AND t.typname = :typname
- AND n.nspname = :nspname
- )
- """
- query = sql.text(query)
- else:
- query = """
- SELECT EXISTS (
- SELECT * FROM pg_catalog.pg_type t
- WHERE t.typname = :typname
- AND pg_type_is_visible(t.oid)
- )
- """
- query = sql.text(query)
- query = query.bindparams(
- sql.bindparam('typname',
- util.text_type(type_name), type_=sqltypes.Unicode),
- )
- if schema is not None:
- query = query.bindparams(
- sql.bindparam('nspname',
- util.text_type(schema), type_=sqltypes.Unicode),
- )
- cursor = connection.execute(query)
- return bool(cursor.scalar())
- def _get_server_version_info(self, connection):
- v = connection.execute("select version()").scalar()
- m = re.match(
- r'.*(?:PostgreSQL|EnterpriseDB) '
- r'(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel)?',
- v)
- if not m:
- raise AssertionError(
- "Could not determine version from string '%s'" % v)
- return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
- @reflection.cache
- def get_table_oid(self, connection, table_name, schema=None, **kw):
- """Fetch the oid for schema.table_name.
- Several reflection methods require the table oid. The idea for using
- this method is that it can be fetched one time and cached for
- subsequent calls.
- """
- table_oid = None
- if schema is not None:
- schema_where_clause = "n.nspname = :schema"
- else:
- schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
- query = """
- SELECT c.oid
- FROM pg_catalog.pg_class c
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- WHERE (%s)
- AND c.relname = :table_name AND c.relkind in ('r', 'v', 'm', 'f')
- """ % schema_where_clause
- # Since we're binding to unicode, table_name and schema_name must be
- # unicode.
- table_name = util.text_type(table_name)
- if schema is not None:
- schema = util.text_type(schema)
- s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
- s = s.columns(oid=sqltypes.Integer)
- if schema:
- s = s.bindparams(sql.bindparam('schema', type_=sqltypes.Unicode))
- c = connection.execute(s, table_name=table_name, schema=schema)
- table_oid = c.scalar()
- if table_oid is None:
- raise exc.NoSuchTableError(table_name)
- return table_oid
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- result = connection.execute(
- sql.text("SELECT nspname FROM pg_namespace "
- "WHERE nspname NOT LIKE 'pg_%' "
- "ORDER BY nspname"
- ).columns(nspname=sqltypes.Unicode))
- return [name for name, in result]
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- result = connection.execute(
- sql.text("SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind = 'r'"
- ).columns(relname=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name)
- return [name for name, in result]
- @reflection.cache
- def _get_foreign_table_names(self, connection, schema=None, **kw):
- result = connection.execute(
- sql.text("SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind = 'f'"
- ).columns(relname=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name)
- return [name for name, in result]
- @reflection.cache
- def get_view_names(
- self, connection, schema=None,
- include=('plain', 'materialized'), **kw):
- include_kind = {'plain': 'v', 'materialized': 'm'}
- try:
- kinds = [include_kind[i] for i in util.to_list(include)]
- except KeyError:
- raise ValueError(
- "include %r unknown, needs to be a sequence containing "
- "one or both of 'plain' and 'materialized'" % (include,))
- if not kinds:
- raise ValueError(
- "empty include, needs to be a sequence containing "
- "one or both of 'plain' and 'materialized'")
- result = connection.execute(
- sql.text("SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind IN (%s)" %
- (", ".join("'%s'" % elem for elem in kinds))
- ).columns(relname=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name)
- return [name for name, in result]
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- view_def = connection.scalar(
- sql.text("SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relname = :view_name "
- "AND c.relkind IN ('v', 'm')"
- ).columns(view_def=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name,
- view_name=view_name)
- return view_def
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- SQL_COLS = """
- SELECT a.attname,
- pg_catalog.format_type(a.atttypid, a.atttypmod),
- (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
- FROM pg_catalog.pg_attrdef d
- WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
- AND a.atthasdef)
- AS DEFAULT,
- a.attnotnull, a.attnum, a.attrelid as table_oid
- FROM pg_catalog.pg_attribute a
- WHERE a.attrelid = :table_oid
- AND a.attnum > 0 AND NOT a.attisdropped
- ORDER BY a.attnum
- """
- s = sql.text(SQL_COLS,
- bindparams=[
- sql.bindparam('table_oid', type_=sqltypes.Integer)],
- typemap={
- 'attname': sqltypes.Unicode,
- 'default': sqltypes.Unicode}
- )
- c = connection.execute(s, table_oid=table_oid)
- rows = c.fetchall()
- domains = self._load_domains(connection)
- enums = dict(
- (
- "%s.%s" % (rec['schema'], rec['name'])
- if not rec['visible'] else rec['name'], rec) for rec in
- self._load_enums(connection, schema='*')
- )
- # format columns
- columns = []
- for name, format_type, default, notnull, attnum, table_oid in rows:
- column_info = self._get_column_info(
- name, format_type, default, notnull, domains, enums, schema)
- columns.append(column_info)
- return columns
- def _get_column_info(self, name, format_type, default,
- notnull, domains, enums, schema):
- # strip (*) from character varying(5), timestamp(5)
- # with time zone, geometry(POLYGON), etc.
- attype = re.sub(r'\(.*\)', '', format_type)
- # strip '[]' from integer[], etc.
- attype = attype.replace('[]', '')
- nullable = not notnull
- is_array = format_type.endswith('[]')
- charlen = re.search(r'\(([\d,]+)\)', format_type)
- if charlen:
- charlen = charlen.group(1)
- args = re.search(r'\((.*)\)', format_type)
- if args and args.group(1):
- args = tuple(re.split(r'\s*,\s*', args.group(1)))
- else:
- args = ()
- kwargs = {}
- if attype == 'numeric':
- if charlen:
- prec, scale = charlen.split(',')
- args = (int(prec), int(scale))
- else:
- args = ()
- elif attype == 'double precision':
- args = (53, )
- elif attype == 'integer':
- args = ()
- elif attype in ('timestamp with time zone',
- 'time with time zone'):
- kwargs['timezone'] = True
- if charlen:
- kwargs['precision'] = int(charlen)
- args = ()
- elif attype in ('timestamp without time zone',
- 'time without time zone', 'time'):
- kwargs['timezone'] = False
- if charlen:
- kwargs['precision'] = int(charlen)
- args = ()
- elif attype == 'bit varying':
- kwargs['varying'] = True
- if charlen:
- args = (int(charlen),)
- else:
- args = ()
- elif attype in ('interval', 'interval year to month',
- 'interval day to second'):
- if charlen:
- kwargs['precision'] = int(charlen)
- args = ()
- elif charlen:
- args = (int(charlen),)
- while True:
- if attype in self.ischema_names:
- coltype = self.ischema_names[attype]
- break
- elif attype in enums:
- enum = enums[attype]
- coltype = ENUM
- kwargs['name'] = enum['name']
- if not enum['visible']:
- kwargs['schema'] = enum['schema']
- args = tuple(enum['labels'])
- break
- elif attype in domains:
- domain = domains[attype]
- attype = domain['attype']
- # A table can't override whether the domain is nullable.
- nullable = domain['nullable']
- if domain['default'] and not default:
- # It can, however, override the default
- # value, but can't set it to null.
- default = domain['default']
- continue
- else:
- coltype = None
- break
- if coltype:
- coltype = coltype(*args, **kwargs)
- if is_array:
- coltype = self.ischema_names['_array'](coltype)
- else:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (attype, name))
- coltype = sqltypes.NULLTYPE
- # adjust the default value
- autoincrement = False
- if default is not None:
- match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
- if match is not None:
- if issubclass(coltype._type_affinity, sqltypes.Integer):
- autoincrement = True
- # the default is related to a Sequence
- sch = schema
- if '.' not in match.group(2) and sch is not None:
- # unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules /
- # "quote schema"
- default = match.group(1) + \
- ('"%s"' % sch) + '.' + \
- match.group(2) + match.group(3)
- column_info = dict(name=name, type=coltype, nullable=nullable,
- default=default, autoincrement=autoincrement)
- return column_info
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- if self.server_version_info < (8, 4):
- PK_SQL = """
- SELECT a.attname
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_attribute a
- on t.oid=a.attrelid AND %s
- WHERE
- t.oid = :table_oid and ix.indisprimary = 't'
- ORDER BY a.attnum
- """ % self._pg_index_any("a.attnum", "ix.indkey")
- else:
- # unnest() and generate_subscripts() both introduced in
- # version 8.4
- PK_SQL = """
- SELECT a.attname
- FROM pg_attribute a JOIN (
- SELECT unnest(ix.indkey) attnum,
- generate_subscripts(ix.indkey, 1) ord
- FROM pg_index ix
- WHERE ix.indrelid = :table_oid AND ix.indisprimary
- ) k ON a.attnum=k.attnum
- WHERE a.attrelid = :table_oid
- ORDER BY k.ord
- """
- t = sql.text(PK_SQL, typemap={'attname': sqltypes.Unicode})
- c = connection.execute(t, table_oid=table_oid)
- cols = [r[0] for r in c.fetchall()]
- PK_CONS_SQL = """
- SELECT conname
- FROM pg_catalog.pg_constraint r
- WHERE r.conrelid = :table_oid AND r.contype = 'p'
- ORDER BY 1
- """
- t = sql.text(PK_CONS_SQL, typemap={'conname': sqltypes.Unicode})
- c = connection.execute(t, table_oid=table_oid)
- name = c.scalar()
- return {'constrained_columns': cols, 'name': name}
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None,
- postgresql_ignore_search_path=False, **kw):
- preparer = self.identifier_preparer
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- FK_SQL = """
- SELECT r.conname,
- pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
- n.nspname as conschema
- FROM pg_catalog.pg_constraint r,
- pg_namespace n,
- pg_class c
- WHERE r.conrelid = :table AND
- r.contype = 'f' AND
- c.oid = confrelid AND
- n.oid = c.relnamespace
- ORDER BY 1
- """
- # http://www.postgresql.org/docs/9.0/static/sql-createtable.html
- FK_REGEX = re.compile(
- r'FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)'
- r'[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?'
- r'[\s]?(ON UPDATE '
- r'(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?'
- r'[\s]?(ON DELETE '
- r'(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?'
- r'[\s]?(DEFERRABLE|NOT DEFERRABLE)?'
- r'[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?'
- )
- t = sql.text(FK_SQL, typemap={
- 'conname': sqltypes.Unicode,
- 'condef': sqltypes.Unicode})
- c = connection.execute(t, table=table_oid)
- fkeys = []
- for conname, condef, conschema in c.fetchall():
- m = re.search(FK_REGEX, condef).groups()
- constrained_columns, referred_schema, \
- referred_table, referred_columns, \
- _, match, _, onupdate, _, ondelete, \
- deferrable, _, initially = m
- if deferrable is not None:
- deferrable = True if deferrable == 'DEFERRABLE' else False
- constrained_columns = [preparer._unquote_identifier(x)
- for x in re.split(
- r'\s*,\s*', constrained_columns)]
- if postgresql_ignore_search_path:
- # when ignoring search path, we use the actual schema
- # provided it isn't the "default" schema
- if conschema != self.default_schema_name:
- referred_schema = conschema
- else:
- referred_schema = schema
- elif referred_schema:
- # referred_schema is the schema that we regexp'ed from
- # pg_get_constraintdef(). If the schema is in the search
- # path, pg_get_constraintdef() will give us None.
- referred_schema = \
- preparer._unquote_identifier(referred_schema)
- elif schema is not None and schema == conschema:
- # If the actual schema matches the schema of the table
- # we're reflecting, then we will use that.
- referred_schema = schema
- referred_table = preparer._unquote_identifier(referred_table)
- referred_columns = [preparer._unquote_identifier(x)
- for x in
- re.split(r'\s*,\s', referred_columns)]
- fkey_d = {
- 'name': conname,
- 'constrained_columns': constrained_columns,
- 'referred_schema': referred_schema,
- 'referred_table': referred_table,
- 'referred_columns': referred_columns,
- 'options': {
- 'onupdate': onupdate,
- 'ondelete': ondelete,
- 'deferrable': deferrable,
- 'initially': initially,
- 'match': match
- }
- }
- fkeys.append(fkey_d)
- return fkeys
- def _pg_index_any(self, col, compare_to):
- if self.server_version_info < (8, 1):
- # http://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
- # "In CVS tip you could replace this with "attnum = ANY (indkey)".
- # Unfortunately, most array support doesn't work on int2vector in
- # pre-8.1 releases, so I think you're kinda stuck with the above
- # for now.
- # regards, tom lane"
- return "(%s)" % " OR ".join(
- "%s[%d] = %s" % (compare_to, ind, col)
- for ind in range(0, 10)
- )
- else:
- return "%s = ANY(%s)" % (col, compare_to)
- @reflection.cache
- def get_indexes(self, connection, table_name, schema, **kw):
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- # cast indkey as varchar since it's an int2vector,
- # returned as a list by some drivers such as pypostgresql
- if self.server_version_info < (8, 5):
- IDX_SQL = """
- SELECT
- i.relname as relname,
- ix.indisunique, ix.indexprs, ix.indpred,
- a.attname, a.attnum, NULL, ix.indkey%s,
- %s, am.amname
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_class i on i.oid = ix.indexrelid
- left outer join
- pg_attribute a
- on t.oid = a.attrelid and %s
- left outer join
- pg_am am
- on i.relam = am.oid
- WHERE
- t.relkind IN ('r', 'v', 'f', 'm')
- and t.oid = :table_oid
- and ix.indisprimary = 'f'
- ORDER BY
- t.relname,
- i.relname
- """ % (
- # version 8.3 here was based on observing the
- # cast does not work in PG 8.2.4, does work in 8.3.0.
- # nothing in PG changelogs regarding this.
- "::varchar" if self.server_version_info >= (8, 3) else "",
- "i.reloptions" if self.server_version_info >= (8, 2)
- else "NULL",
- self._pg_index_any("a.attnum", "ix.indkey")
- )
- else:
- IDX_SQL = """
- SELECT
- i.relname as relname,
- ix.indisunique, ix.indexprs, ix.indpred,
- a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
- i.reloptions, am.amname
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_class i on i.oid = ix.indexrelid
- left outer join
- pg_attribute a
- on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
- left outer join
- pg_constraint c
- on (ix.indrelid = c.conrelid and
- ix.indexrelid = c.conindid and
- c.contype in ('p', 'u', 'x'))
- left outer join
- pg_am am
- on i.relam = am.oid
- WHERE
- t.relkind IN ('r', 'v', 'f', 'm')
- and t.oid = :table_oid
- and ix.indisprimary = 'f'
- ORDER BY
- t.relname,
- i.relname
- """
- t = sql.text(IDX_SQL, typemap={
- 'relname': sqltypes.Unicode,
- 'attname': sqltypes.Unicode})
- c = connection.execute(t, table_oid=table_oid)
- indexes = defaultdict(lambda: defaultdict(dict))
- sv_idx_name = None
- for row in c.fetchall():
- (idx_name, unique, expr, prd, col,
- col_num, conrelid, idx_key, options, amname) = row
- if expr:
- if idx_name != sv_idx_name:
- util.warn(
- "Skipped unsupported reflection of "
- "expression-based index %s"
- % idx_name)
- sv_idx_name = idx_name
- continue
- if prd and not idx_name == sv_idx_name:
- util.warn(
- "Predicate of partial index %s ignored during reflection"
- % idx_name)
- sv_idx_name = idx_name
- has_idx = idx_name in indexes
- index = indexes[idx_name]
- if col is not None:
- index['cols'][col_num] = col
- if not has_idx:
- index['key'] = [int(k.strip()) for k in idx_key.split()]
- index['unique'] = unique
- if conrelid is not None:
- index['duplicates_constraint'] = idx_name
- if options:
- index['options'] = dict(
- [option.split("=") for option in options])
- # it *might* be nice to include that this is 'btree' in the
- # reflection info. But we don't want an Index object
- # to have a ``postgresql_using`` in it that is just the
- # default, so for the moment leaving this out.
- if amname and amname != 'btree':
- index['amname'] = amname
- result = []
- for name, idx in indexes.items():
- entry = {
- 'name': name,
- 'unique': idx['unique'],
- 'column_names': [idx['cols'][i] for i in idx['key']]
- }
- if 'duplicates_constraint' in idx:
- entry['duplicates_constraint'] = idx['duplicates_constraint']
- if 'options' in idx:
- entry.setdefault(
- 'dialect_options', {})["postgresql_with"] = idx['options']
- if 'amname' in idx:
- entry.setdefault(
- 'dialect_options', {})["postgresql_using"] = idx['amname']
- result.append(entry)
- return result
- @reflection.cache
- def get_unique_constraints(self, connection, table_name,
- schema=None, **kw):
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- UNIQUE_SQL = """
- SELECT
- cons.conname as name,
- cons.conkey as key,
- a.attnum as col_num,
- a.attname as col_name
- FROM
- pg_catalog.pg_constraint cons
- join pg_attribute a
- on cons.conrelid = a.attrelid AND
- a.attnum = ANY(cons.conkey)
- WHERE
- cons.conrelid = :table_oid AND
- cons.contype = 'u'
- """
- t = sql.text(UNIQUE_SQL, typemap={'col_name': sqltypes.Unicode})
- c = connection.execute(t, table_oid=table_oid)
- uniques = defaultdict(lambda: defaultdict(dict))
- for row in c.fetchall():
- uc = uniques[row.name]
- uc["key"] = row.key
- uc["cols"][row.col_num] = row.col_name
- return [
- {'name': name,
- 'column_names': [uc["cols"][i] for i in uc["key"]]}
- for name, uc in uniques.items()
- ]
- @reflection.cache
- def get_check_constraints(
- self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
- CHECK_SQL = """
- SELECT
- cons.conname as name,
- cons.consrc as src
- FROM
- pg_catalog.pg_constraint cons
- WHERE
- cons.conrelid = :table_oid AND
- cons.contype = 'c'
- """
- c = connection.execute(sql.text(CHECK_SQL), table_oid=table_oid)
- return [
- {'name': name,
- 'sqltext': src[1:-1]}
- for name, src in c.fetchall()
- ]
- def _load_enums(self, connection, schema=None):
- schema = schema or self.default_schema_name
- if not self.supports_native_enum:
- return {}
- # Load data types for enums:
- SQL_ENUMS = """
- SELECT t.typname as "name",
- -- no enum defaults in 8.4 at least
- -- t.typdefault as "default",
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema",
- e.enumlabel as "label"
- FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
- WHERE t.typtype = 'e'
- """
- if schema != '*':
- SQL_ENUMS += "AND n.nspname = :schema "
- # e.oid gives us label order within an enum
- SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
- s = sql.text(SQL_ENUMS, typemap={
- 'attname': sqltypes.Unicode,
- 'label': sqltypes.Unicode})
- if schema != '*':
- s = s.bindparams(schema=schema)
- c = connection.execute(s)
- enums = []
- enum_by_name = {}
- for enum in c.fetchall():
- key = (enum['schema'], enum['name'])
- if key in enum_by_name:
- enum_by_name[key]['labels'].append(enum['label'])
- else:
- enum_by_name[key] = enum_rec = {
- 'name': enum['name'],
- 'schema': enum['schema'],
- 'visible': enum['visible'],
- 'labels': [enum['label']],
- }
- enums.append(enum_rec)
- return enums
- def _load_domains(self, connection):
- # Load data types for domains:
- SQL_DOMAINS = """
- SELECT t.typname as "name",
- pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
- not t.typnotnull as "nullable",
- t.typdefault as "default",
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema"
- FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- WHERE t.typtype = 'd'
- """
- s = sql.text(SQL_DOMAINS, typemap={'attname': sqltypes.Unicode})
- c = connection.execute(s)
- domains = {}
- for domain in c.fetchall():
- # strip (30) from character varying(30)
- attype = re.search(r'([^\(]+)', domain['attype']).group(1)
- if domain['visible']:
- # 'visible' just means whether or not the domain is in a
- # schema that's on the search path -- or not overridden by
- # a schema with higher precedence. If it's not visible,
- # it will be prefixed with the schema-name when it's used.
- name = domain['name']
- else:
- name = "%s.%s" % (domain['schema'], domain['name'])
- domains[name] = {
- 'attype': attype,
- 'nullable': domain['nullable'],
- 'default': domain['default']
- }
- return domains
|