123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602 |
- # oracle/base.py
- # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- """
- .. dialect:: oracle
- :name: Oracle
- Oracle version 8 through current (11g at the time of this writing) are
- supported.
- Connect Arguments
- -----------------
- The dialect supports several :func:`~sqlalchemy.create_engine()` arguments
- which affect the behavior of the dialect regardless of driver in use.
- * ``use_ansi`` - Use ANSI JOIN constructs (see the section on Oracle 8).
- Defaults to ``True``. If ``False``, Oracle-8 compatible constructs are used
- for joins.
- * ``optimize_limits`` - defaults to ``False``. see the section on
- LIMIT/OFFSET.
- * ``use_binds_for_limits`` - defaults to ``True``. see the section on
- LIMIT/OFFSET.
- Auto Increment Behavior
- -----------------------
- SQLAlchemy Table objects which include integer primary keys are usually
- assumed to have "autoincrementing" behavior, meaning they can generate their
- own primary key values upon INSERT. Since Oracle has no "autoincrement"
- feature, SQLAlchemy relies upon sequences to produce these values. With the
- Oracle dialect, *a sequence must always be explicitly specified to enable
- autoincrement*. This is divergent with the majority of documentation
- examples which assume the usage of an autoincrement-capable database. To
- specify sequences, use the sqlalchemy.schema.Sequence object which is passed
- to a Column construct::
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq'), primary_key=True),
- Column(...), ...
- )
- This step is also required when using table reflection, i.e. autoload=True::
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq'), primary_key=True),
- autoload=True
- )
- Identifier Casing
- -----------------
- In Oracle, the data dictionary represents all case insensitive identifier
- names using UPPERCASE text. SQLAlchemy on the other hand considers an
- all-lower case identifier name to be case insensitive. The Oracle dialect
- converts all case insensitive identifiers to and from those two formats during
- schema level communication, such as reflection of tables and indexes. Using
- an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
- identifier, and SQLAlchemy will quote the name - this will cause mismatches
- against data dictionary data received from Oracle, so unless identifier names
- have been truly created as case sensitive (i.e. using quoted names), all
- lowercase names should be used on the SQLAlchemy side.
- LIMIT/OFFSET Support
- --------------------
- Oracle has no support for the LIMIT or OFFSET keywords. SQLAlchemy uses
- a wrapped subquery approach in conjunction with ROWNUM. The exact methodology
- is taken from
- http://www.oracle.com/technetwork/issue-archive/2006/06-sep/o56asktom-086197.html .
- There are two options which affect its behavior:
- * the "FIRST ROWS()" optimization keyword is not used by default. To enable
- the usage of this optimization directive, specify ``optimize_limits=True``
- to :func:`.create_engine`.
- * the values passed for the limit/offset are sent as bound parameters. Some
- users have observed that Oracle produces a poor query plan when the values
- are sent as binds and not rendered literally. To render the limit/offset
- values literally within the SQL statement, specify
- ``use_binds_for_limits=False`` to :func:`.create_engine`.
- Some users have reported better performance when the entirely different
- approach of a window query is used, i.e. ROW_NUMBER() OVER (ORDER BY), to
- provide LIMIT/OFFSET (note that the majority of users don't observe this).
- To suit this case the method used for LIMIT/OFFSET can be replaced entirely.
- See the recipe at
- http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowFunctionsByDefault
- which installs a select compiler that overrides the generation of limit/offset
- with a window function.
- .. _oracle_returning:
- RETURNING Support
- -----------------
- The Oracle database supports a limited form of RETURNING, in order to retrieve
- result sets of matched rows from INSERT, UPDATE and DELETE statements.
- Oracle's RETURNING..INTO syntax only supports one row being returned, as it
- relies upon OUT parameters in order to function. In addition, supported
- DBAPIs have further limitations (see :ref:`cx_oracle_returning`).
- SQLAlchemy's "implicit returning" feature, which employs RETURNING within an
- INSERT and sometimes an UPDATE statement in order to fetch newly generated
- primary key values and other SQL defaults and expressions, is normally enabled
- on the Oracle backend. By default, "implicit returning" typically only
- fetches the value of a single ``nextval(some_seq)`` expression embedded into
- an INSERT in order to increment a sequence within an INSERT statement and get
- the value back at the same time. To disable this feature across the board,
- specify ``implicit_returning=False`` to :func:`.create_engine`::
- engine = create_engine("oracle://scott:tiger@dsn",
- implicit_returning=False)
- Implicit returning can also be disabled on a table-by-table basis as a table
- option::
- # Core Table
- my_table = Table("my_table", metadata, ..., implicit_returning=False)
- # declarative
- class MyClass(Base):
- __tablename__ = 'my_table'
- __table_args__ = {"implicit_returning": False}
- .. seealso::
- :ref:`cx_oracle_returning` - additional cx_oracle-specific restrictions on
- implicit returning.
- ON UPDATE CASCADE
- -----------------
- Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based
- solution is available at
- http://asktom.oracle.com/tkyte/update_cascade/index.html .
- When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
- cascading updates - specify ForeignKey objects using the
- "deferrable=True, initially='deferred'" keyword arguments,
- and specify "passive_updates=False" on each relationship().
- Oracle 8 Compatibility
- ----------------------
- When Oracle 8 is detected, the dialect internally configures itself to the
- following behaviors:
- * the use_ansi flag is set to False. This has the effect of converting all
- JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
- makes use of Oracle's (+) operator.
- * the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
- the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are
- issued instead. This because these types don't seem to work correctly on
- Oracle 8 even though they are available. The
- :class:`~sqlalchemy.types.NVARCHAR` and
- :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
- NVARCHAR2 and NCLOB.
- * the "native unicode" mode is disabled when using cx_oracle, i.e. SQLAlchemy
- encodes all Python unicode objects to "string" before passing in as bind
- parameters.
- Synonym/DBLINK Reflection
- -------------------------
- When using reflection with Table objects, the dialect can optionally search
- for tables indicated by synonyms, either in local or remote schemas or
- accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
- a keyword argument to the :class:`.Table` construct::
- some_table = Table('some_table', autoload=True,
- autoload_with=some_engine,
- oracle_resolve_synonyms=True)
- When this flag is set, the given name (such as ``some_table`` above) will
- be searched not just in the ``ALL_TABLES`` view, but also within the
- ``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
- name. If the synonym is located and refers to a DBLINK, the oracle dialect
- knows how to locate the table's information using DBLINK syntax(e.g.
- ``@dblink``).
- ``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
- accepted, including methods such as :meth:`.MetaData.reflect` and
- :meth:`.Inspector.get_columns`.
- If synonyms are not in use, this flag should be left disabled.
- Table names with SYSTEM/SYSAUX tablespaces
- -------------------------------------------
- The :meth:`.Inspector.get_table_names` and
- :meth:`.Inspector.get_temp_table_names`
- methods each return a list of table names for the current engine. These methods
- are also part of the reflection which occurs within an operation such as
- :meth:`.MetaData.reflect`. By default, these operations exclude the ``SYSTEM``
- and ``SYSAUX`` tablespaces from the operation. In order to change this, the
- default list of tablespaces excluded can be changed at the engine level using
- the ``exclude_tablespaces`` parameter::
- # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
- e = create_engine(
- "oracle://scott:tiger@xe",
- exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
- .. versionadded:: 1.1
- DateTime Compatibility
- ----------------------
- Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
- which can actually store a date and time value. For this reason, the Oracle
- dialect provides a type :class:`.oracle.DATE` which is a subclass of
- :class:`.DateTime`. This type has no special behavior, and is only
- present as a "marker" for this type; additionally, when a database column
- is reflected and the type is reported as ``DATE``, the time-supporting
- :class:`.oracle.DATE` type is used.
- .. versionchanged:: 0.9.4 Added :class:`.oracle.DATE` to subclass
- :class:`.DateTime`. This is a change as previous versions
- would reflect a ``DATE`` column as :class:`.types.DATE`, which subclasses
- :class:`.Date`. The only significance here is for schemes that are
- examining the type of column for use in special Python translations or
- for migrating schemas to other database backends.
- .. _oracle_table_options:
- Oracle Table Options
- -------------------------
- The CREATE TABLE phrase supports the following options with Oracle
- in conjunction with the :class:`.Table` construct:
- * ``ON COMMIT``::
- Table(
- "some_table", metadata, ...,
- prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
- .. versionadded:: 1.0.0
- * ``COMPRESS``::
- Table('mytable', metadata, Column('data', String(32)),
- oracle_compress=True)
- Table('mytable', metadata, Column('data', String(32)),
- oracle_compress=6)
- The ``oracle_compress`` parameter accepts either an integer compression
- level, or ``True`` to use the default compression level.
- .. versionadded:: 1.0.0
- .. _oracle_index_options:
- Oracle Specific Index Options
- -----------------------------
- Bitmap Indexes
- ~~~~~~~~~~~~~~
- You can specify the ``oracle_bitmap`` parameter to create a bitmap index
- instead of a B-tree index::
- Index('my_index', my_table.c.data, oracle_bitmap=True)
- Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
- check for such limitations, only the database will.
- .. versionadded:: 1.0.0
- Index compression
- ~~~~~~~~~~~~~~~~~
- Oracle has a more efficient storage mode for indexes containing lots of
- repeated values. Use the ``oracle_compress`` parameter to turn on key c
- ompression::
- Index('my_index', my_table.c.data, oracle_compress=True)
- Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
- oracle_compress=1)
- The ``oracle_compress`` parameter accepts either an integer specifying the
- number of prefix columns to compress, or ``True`` to use the default (all
- columns for non-unique indexes, all but the last column for unique indexes).
- .. versionadded:: 1.0.0
- """
- import re
- from sqlalchemy import util, sql
- from sqlalchemy.engine import default, reflection
- from sqlalchemy.sql import compiler, visitors, expression, util as sql_util
- from sqlalchemy.sql import operators as sql_operators
- from sqlalchemy.sql.elements import quoted_name
- from sqlalchemy import types as sqltypes, schema as sa_schema
- from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \
- BLOB, CLOB, TIMESTAMP, FLOAT
- RESERVED_WORDS = \
- set('SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN '
- 'DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED '
- 'ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE '
- 'ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE '
- 'BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES '
- 'AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS '
- 'NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER '
- 'CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR '
- 'DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL'.split())
- NO_ARG_FNS = set('UID CURRENT_DATE SYSDATE USER '
- 'CURRENT_TIME CURRENT_TIMESTAMP'.split())
- class RAW(sqltypes._Binary):
- __visit_name__ = 'RAW'
- OracleRaw = RAW
- class NCLOB(sqltypes.Text):
- __visit_name__ = 'NCLOB'
- class VARCHAR2(VARCHAR):
- __visit_name__ = 'VARCHAR2'
- NVARCHAR2 = NVARCHAR
- class NUMBER(sqltypes.Numeric, sqltypes.Integer):
- __visit_name__ = 'NUMBER'
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = bool(scale and scale > 0)
- super(NUMBER, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
- def adapt(self, impltype):
- ret = super(NUMBER, self).adapt(impltype)
- # leave a hint for the DBAPI handler
- ret._is_oracle_number = True
- return ret
- @property
- def _type_affinity(self):
- if bool(self.scale and self.scale > 0):
- return sqltypes.Numeric
- else:
- return sqltypes.Integer
- class DOUBLE_PRECISION(sqltypes.Numeric):
- __visit_name__ = 'DOUBLE_PRECISION'
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = False
- super(DOUBLE_PRECISION, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
- class BFILE(sqltypes.LargeBinary):
- __visit_name__ = 'BFILE'
- class LONG(sqltypes.Text):
- __visit_name__ = 'LONG'
- class DATE(sqltypes.DateTime):
- """Provide the oracle DATE type.
- This type has no special Python behavior, except that it subclasses
- :class:`.types.DateTime`; this is to suit the fact that the Oracle
- ``DATE`` type supports a time value.
- .. versionadded:: 0.9.4
- """
- __visit_name__ = 'DATE'
- def _compare_type_affinity(self, other):
- return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
- class INTERVAL(sqltypes.TypeEngine):
- __visit_name__ = 'INTERVAL'
- def __init__(self,
- day_precision=None,
- second_precision=None):
- """Construct an INTERVAL.
- Note that only DAY TO SECOND intervals are currently supported.
- This is due to a lack of support for YEAR TO MONTH intervals
- within available DBAPIs (cx_oracle and zxjdbc).
- :param day_precision: the day precision value. this is the number of
- digits to store for the day field. Defaults to "2"
- :param second_precision: the second precision value. this is the
- number of digits to store for the fractional seconds field.
- Defaults to "6".
- """
- self.day_precision = day_precision
- self.second_precision = second_precision
- @classmethod
- def _adapt_from_generic_interval(cls, interval):
- return INTERVAL(day_precision=interval.day_precision,
- second_precision=interval.second_precision)
- @property
- def _type_affinity(self):
- return sqltypes.Interval
- class ROWID(sqltypes.TypeEngine):
- """Oracle ROWID type.
- When used in a cast() or similar, generates ROWID.
- """
- __visit_name__ = 'ROWID'
- class _OracleBoolean(sqltypes.Boolean):
- def get_dbapi_type(self, dbapi):
- return dbapi.NUMBER
- colspecs = {
- sqltypes.Boolean: _OracleBoolean,
- sqltypes.Interval: INTERVAL,
- sqltypes.DateTime: DATE
- }
- ischema_names = {
- 'VARCHAR2': VARCHAR,
- 'NVARCHAR2': NVARCHAR,
- 'CHAR': CHAR,
- 'DATE': DATE,
- 'NUMBER': NUMBER,
- 'BLOB': BLOB,
- 'BFILE': BFILE,
- 'CLOB': CLOB,
- 'NCLOB': NCLOB,
- 'TIMESTAMP': TIMESTAMP,
- 'TIMESTAMP WITH TIME ZONE': TIMESTAMP,
- 'INTERVAL DAY TO SECOND': INTERVAL,
- 'RAW': RAW,
- 'FLOAT': FLOAT,
- 'DOUBLE PRECISION': DOUBLE_PRECISION,
- 'LONG': LONG,
- }
- class OracleTypeCompiler(compiler.GenericTypeCompiler):
- # Note:
- # Oracle DATE == DATETIME
- # Oracle does not allow milliseconds in DATE
- # Oracle does not support TIME columns
- def visit_datetime(self, type_, **kw):
- return self.visit_DATE(type_, **kw)
- def visit_float(self, type_, **kw):
- return self.visit_FLOAT(type_, **kw)
- def visit_unicode(self, type_, **kw):
- if self.dialect._supports_nchar:
- return self.visit_NVARCHAR2(type_, **kw)
- else:
- return self.visit_VARCHAR2(type_, **kw)
- def visit_INTERVAL(self, type_, **kw):
- return "INTERVAL DAY%s TO SECOND%s" % (
- type_.day_precision is not None and
- "(%d)" % type_.day_precision or
- "",
- type_.second_precision is not None and
- "(%d)" % type_.second_precision or
- "",
- )
- def visit_LONG(self, type_, **kw):
- return "LONG"
- def visit_TIMESTAMP(self, type_, **kw):
- if type_.timezone:
- return "TIMESTAMP WITH TIME ZONE"
- else:
- return "TIMESTAMP"
- def visit_DOUBLE_PRECISION(self, type_, **kw):
- return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
- def visit_NUMBER(self, type_, **kw):
- return self._generate_numeric(type_, "NUMBER", **kw)
- def _generate_numeric(self, type_, name, precision=None, scale=None, **kw):
- if precision is None:
- precision = type_.precision
- if scale is None:
- scale = getattr(type_, 'scale', None)
- if precision is None:
- return name
- elif scale is None:
- n = "%(name)s(%(precision)s)"
- return n % {'name': name, 'precision': precision}
- else:
- n = "%(name)s(%(precision)s, %(scale)s)"
- return n % {'name': name, 'precision': precision, 'scale': scale}
- def visit_string(self, type_, **kw):
- return self.visit_VARCHAR2(type_, **kw)
- def visit_VARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, '', '2')
- def visit_NVARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, 'N', '2')
- visit_NVARCHAR = visit_NVARCHAR2
- def visit_VARCHAR(self, type_, **kw):
- return self._visit_varchar(type_, '', '')
- def _visit_varchar(self, type_, n, num):
- if not type_.length:
- return "%(n)sVARCHAR%(two)s" % {'two': num, 'n': n}
- elif not n and self.dialect._supports_char_length:
- varchar = "VARCHAR%(two)s(%(length)s CHAR)"
- return varchar % {'length': type_.length, 'two': num}
- else:
- varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
- return varchar % {'length': type_.length, 'two': num, 'n': n}
- def visit_text(self, type_, **kw):
- return self.visit_CLOB(type_, **kw)
- def visit_unicode_text(self, type_, **kw):
- if self.dialect._supports_nchar:
- return self.visit_NCLOB(type_, **kw)
- else:
- return self.visit_CLOB(type_, **kw)
- def visit_large_binary(self, type_, **kw):
- return self.visit_BLOB(type_, **kw)
- def visit_big_integer(self, type_, **kw):
- return self.visit_NUMBER(type_, precision=19, **kw)
- def visit_boolean(self, type_, **kw):
- return self.visit_SMALLINT(type_, **kw)
- def visit_RAW(self, type_, **kw):
- if type_.length:
- return "RAW(%(length)s)" % {'length': type_.length}
- else:
- return "RAW"
- def visit_ROWID(self, type_, **kw):
- return "ROWID"
- class OracleCompiler(compiler.SQLCompiler):
- """Oracle compiler modifies the lexical structure of Select
- statements to work under non-ANSI configured Oracle databases, if
- the use_ansi flag is False.
- """
- compound_keywords = util.update_copy(
- compiler.SQLCompiler.compound_keywords,
- {
- expression.CompoundSelect.EXCEPT: 'MINUS'
- }
- )
- def __init__(self, *args, **kwargs):
- self.__wheres = {}
- self._quoted_bind_names = {}
- super(OracleCompiler, self).__init__(*args, **kwargs)
- def visit_mod_binary(self, binary, operator, **kw):
- return "mod(%s, %s)" % (self.process(binary.left, **kw),
- self.process(binary.right, **kw))
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
- def visit_char_length_func(self, fn, **kw):
- return "LENGTH" + self.function_argspec(fn, **kw)
- def visit_match_op_binary(self, binary, operator, **kw):
- return "CONTAINS (%s, %s)" % (self.process(binary.left),
- self.process(binary.right))
- def visit_true(self, expr, **kw):
- return '1'
- def visit_false(self, expr, **kw):
- return '0'
- def get_cte_preamble(self, recursive):
- return "WITH"
- def get_select_hint_text(self, byfroms):
- return " ".join(
- "/*+ %s */" % text for table, text in byfroms.items()
- )
- def function_argspec(self, fn, **kw):
- if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
- return compiler.SQLCompiler.function_argspec(self, fn, **kw)
- else:
- return ""
- def default_from(self):
- """Called when a ``SELECT`` statement has no froms,
- and no ``FROM`` clause is to be appended.
- The Oracle compiler tacks a "FROM DUAL" to the statement.
- """
- return " FROM DUAL"
- def visit_join(self, join, **kwargs):
- if self.dialect.use_ansi:
- return compiler.SQLCompiler.visit_join(self, join, **kwargs)
- else:
- kwargs['asfrom'] = True
- if isinstance(join.right, expression.FromGrouping):
- right = join.right.element
- else:
- right = join.right
- return self.process(join.left, **kwargs) + \
- ", " + self.process(right, **kwargs)
- def _get_nonansi_join_whereclause(self, froms):
- clauses = []
- def visit_join(join):
- if join.isouter:
- def visit_binary(binary):
- if binary.operator == sql_operators.eq:
- if join.right.is_derived_from(binary.left.table):
- binary.left = _OuterJoinColumn(binary.left)
- elif join.right.is_derived_from(binary.right.table):
- binary.right = _OuterJoinColumn(binary.right)
- clauses.append(visitors.cloned_traverse(
- join.onclause, {}, {'binary': visit_binary}))
- else:
- clauses.append(join.onclause)
- for j in join.left, join.right:
- if isinstance(j, expression.Join):
- visit_join(j)
- elif isinstance(j, expression.FromGrouping):
- visit_join(j.element)
- for f in froms:
- if isinstance(f, expression.Join):
- visit_join(f)
- if not clauses:
- return None
- else:
- return sql.and_(*clauses)
- def visit_outer_join_column(self, vc, **kw):
- return self.process(vc.column, **kw) + "(+)"
- def visit_sequence(self, seq):
- return (self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval")
- def get_render_as_alias_suffix(self, alias_name_text):
- """Oracle doesn't like ``FROM table AS alias``"""
- return " " + alias_name_text
- def returning_clause(self, stmt, returning_cols):
- columns = []
- binds = []
- for i, column in enumerate(
- expression._select_iterables(returning_cols)):
- if column.type._has_column_expression:
- col_expr = column.type.column_expression(column)
- else:
- col_expr = column
- outparam = sql.outparam("ret_%d" % i, type_=column.type)
- self.binds[outparam.key] = outparam
- binds.append(
- self.bindparam_string(self._truncate_bindparam(outparam)))
- columns.append(
- self.process(col_expr, within_columns_clause=False))
- self._add_to_result_map(
- outparam.key, outparam.key,
- (column, getattr(column, 'name', None),
- getattr(column, 'key', None)),
- column.type
- )
- return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
- def _TODO_visit_compound_select(self, select):
- """Need to determine how to get ``LIMIT``/``OFFSET`` into a
- ``UNION`` for Oracle.
- """
- pass
- def visit_select(self, select, **kwargs):
- """Look for ``LIMIT`` and OFFSET in a select statement, and if
- so tries to wrap it in a subquery with ``rownum`` criterion.
- """
- if not getattr(select, '_oracle_visit', None):
- if not self.dialect.use_ansi:
- froms = self._display_froms_for_select(
- select, kwargs.get('asfrom', False))
- whereclause = self._get_nonansi_join_whereclause(froms)
- if whereclause is not None:
- select = select.where(whereclause)
- select._oracle_visit = True
- limit_clause = select._limit_clause
- offset_clause = select._offset_clause
- if limit_clause is not None or offset_clause is not None:
- # See http://www.oracle.com/technology/oramag/oracle/06-sep/\
- # o56asktom.html
- #
- # Generalized form of an Oracle pagination query:
- # select ... from (
- # select /*+ FIRST_ROWS(N) */ ...., rownum as ora_rn from
- # ( select distinct ... where ... order by ...
- # ) where ROWNUM <= :limit+:offset
- # ) where ora_rn > :offset
- # Outer select and "ROWNUM as ora_rn" can be dropped if
- # limit=0
- kwargs['select_wraps_for'] = select
- select = select._generate()
- select._oracle_visit = True
- # Wrap the middle select and add the hint
- limitselect = sql.select([c for c in select.c])
- if limit_clause is not None and \
- self.dialect.optimize_limits and \
- select._simple_int_limit:
- limitselect = limitselect.prefix_with(
- "/*+ FIRST_ROWS(%d) */" %
- select._limit)
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
- # add expressions to accommodate FOR UPDATE OF
- for_update = select._for_update_arg
- if for_update is not None and for_update.of:
- for_update = for_update._clone()
- for_update._copy_internals()
- for elem in for_update.of:
- select.append_column(elem)
- adapter = sql_util.ClauseAdapter(select)
- for_update.of = [
- adapter.traverse(elem)
- for elem in for_update.of]
- # If needed, add the limiting clause
- if limit_clause is not None:
- if not self.dialect.use_binds_for_limits:
- # use simple int limits, will raise an exception
- # if the limit isn't specified this way
- max_row = select._limit
- if offset_clause is not None:
- max_row += select._offset
- max_row = sql.literal_column("%d" % max_row)
- else:
- max_row = limit_clause
- if offset_clause is not None:
- max_row = max_row + offset_clause
- limitselect.append_whereclause(
- sql.literal_column("ROWNUM") <= max_row)
- # If needed, add the ora_rn, and wrap again with offset.
- if offset_clause is None:
- limitselect._for_update_arg = for_update
- select = limitselect
- else:
- limitselect = limitselect.column(
- sql.literal_column("ROWNUM").label("ora_rn"))
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
- offsetselect = sql.select(
- [c for c in limitselect.c if c.key != 'ora_rn'])
- offsetselect._oracle_visit = True
- offsetselect._is_wrapper = True
- if for_update is not None and for_update.of:
- for elem in for_update.of:
- if limitselect.corresponding_column(elem) is None:
- limitselect.append_column(elem)
- if not self.dialect.use_binds_for_limits:
- offset_clause = sql.literal_column(
- "%d" % select._offset)
- offsetselect.append_whereclause(
- sql.literal_column("ora_rn") > offset_clause)
- offsetselect._for_update_arg = for_update
- select = offsetselect
- return compiler.SQLCompiler.visit_select(self, select, **kwargs)
- def limit_clause(self, select, **kw):
- return ""
- def for_update_clause(self, select, **kw):
- if self.is_subquery():
- return ""
- tmp = ' FOR UPDATE'
- if select._for_update_arg.of:
- tmp += ' OF ' + ', '.join(
- self.process(elem, **kw) for elem in
- select._for_update_arg.of
- )
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
- return tmp
- class OracleDDLCompiler(compiler.DDLCompiler):
- def define_constraint_cascades(self, constraint):
- text = ""
- if constraint.ondelete is not None:
- text += " ON DELETE %s" % constraint.ondelete
- # oracle has no ON UPDATE CASCADE -
- # its only available via triggers
- # http://asktom.oracle.com/tkyte/update_cascade/index.html
- if constraint.onupdate is not None:
- util.warn(
- "Oracle does not contain native UPDATE CASCADE "
- "functionality - onupdates will not be rendered for foreign "
- "keys. Consider using deferrable=True, initially='deferred' "
- "or triggers.")
- return text
- def visit_create_index(self, create):
- index = create.element
- self._verify_index_table(index)
- preparer = self.preparer
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- if index.dialect_options['oracle']['bitmap']:
- text += "BITMAP "
- text += "INDEX %s ON %s (%s)" % (
- self._prepared_index_name(index, include_schema=True),
- preparer.format_table(index.table, use_schema=True),
- ', '.join(
- self.sql_compiler.process(
- expr,
- include_table=False, literal_binds=True)
- for expr in index.expressions)
- )
- if index.dialect_options['oracle']['compress'] is not False:
- if index.dialect_options['oracle']['compress'] is True:
- text += " COMPRESS"
- else:
- text += " COMPRESS %d" % (
- index.dialect_options['oracle']['compress']
- )
- return text
- def post_create_table(self, table):
- table_opts = []
- opts = table.dialect_options['oracle']
- if opts['on_commit']:
- on_commit_options = opts['on_commit'].replace("_", " ").upper()
- table_opts.append('\n ON COMMIT %s' % on_commit_options)
- if opts['compress']:
- if opts['compress'] is True:
- table_opts.append("\n COMPRESS")
- else:
- table_opts.append("\n COMPRESS FOR %s" % (
- opts['compress']
- ))
- return ''.join(table_opts)
- class OracleIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = set([x.lower() for x in RESERVED_WORDS])
- illegal_initial_characters = set(
- (str(dig) for dig in range(0, 10))).union(["_", "$"])
- def _bindparam_requires_quotes(self, value):
- """Return True if the given identifier requires quoting."""
- lc_value = value.lower()
- return (lc_value in self.reserved_words
- or value[0] in self.illegal_initial_characters
- or not self.legal_characters.match(util.text_type(value))
- )
- def format_savepoint(self, savepoint):
- name = savepoint.ident.lstrip('_')
- return super(
- OracleIdentifierPreparer, self).format_savepoint(savepoint, name)
- class OracleExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- "SELECT " +
- self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval FROM DUAL", type_)
- class OracleDialect(default.DefaultDialect):
- name = 'oracle'
- supports_alter = True
- supports_unicode_statements = False
- supports_unicode_binds = False
- max_identifier_length = 30
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = False
- supports_simple_order_by_label = False
- supports_sequences = True
- sequences_optional = False
- postfetch_lastrowid = False
- default_paramstyle = 'named'
- colspecs = colspecs
- ischema_names = ischema_names
- requires_name_normalize = True
- supports_default_values = False
- supports_empty_insert = False
- statement_compiler = OracleCompiler
- ddl_compiler = OracleDDLCompiler
- type_compiler = OracleTypeCompiler
- preparer = OracleIdentifierPreparer
- execution_ctx_cls = OracleExecutionContext
- reflection_options = ('oracle_resolve_synonyms', )
- construct_arguments = [
- (sa_schema.Table, {
- "resolve_synonyms": False,
- "on_commit": None,
- "compress": False
- }),
- (sa_schema.Index, {
- "bitmap": False,
- "compress": False
- })
- ]
- def __init__(self,
- use_ansi=True,
- optimize_limits=False,
- use_binds_for_limits=True,
- exclude_tablespaces=('SYSTEM', 'SYSAUX', ),
- **kwargs):
- default.DefaultDialect.__init__(self, **kwargs)
- self.use_ansi = use_ansi
- self.optimize_limits = optimize_limits
- self.use_binds_for_limits = use_binds_for_limits
- self.exclude_tablespaces = exclude_tablespaces
- def initialize(self, connection):
- super(OracleDialect, self).initialize(connection)
- self.implicit_returning = self.__dict__.get(
- 'implicit_returning',
- self.server_version_info > (10, )
- )
- if self._is_oracle_8:
- self.colspecs = self.colspecs.copy()
- self.colspecs.pop(sqltypes.Interval)
- self.use_ansi = False
- @property
- def _is_oracle_8(self):
- return self.server_version_info and \
- self.server_version_info < (9, )
- @property
- def _supports_table_compression(self):
- return self.server_version_info and \
- self.server_version_info >= (10, 1, )
- @property
- def _supports_table_compress_for(self):
- return self.server_version_info and \
- self.server_version_info >= (11, )
- @property
- def _supports_char_length(self):
- return not self._is_oracle_8
- @property
- def _supports_nchar(self):
- return not self._is_oracle_8
- def do_release_savepoint(self, connection, name):
- # Oracle does not support RELEASE SAVEPOINT
- pass
- def has_table(self, connection, table_name, schema=None):
- if not schema:
- schema = self.default_schema_name
- cursor = connection.execute(
- sql.text("SELECT table_name FROM all_tables "
- "WHERE table_name = :name AND owner = :schema_name"),
- name=self.denormalize_name(table_name),
- schema_name=self.denormalize_name(schema))
- return cursor.first() is not None
- def has_sequence(self, connection, sequence_name, schema=None):
- if not schema:
- schema = self.default_schema_name
- cursor = connection.execute(
- sql.text("SELECT sequence_name FROM all_sequences "
- "WHERE sequence_name = :name AND "
- "sequence_owner = :schema_name"),
- name=self.denormalize_name(sequence_name),
- schema_name=self.denormalize_name(schema))
- return cursor.first() is not None
- def normalize_name(self, name):
- if name is None:
- return None
- if util.py2k:
- if isinstance(name, str):
- name = name.decode(self.encoding)
- if name.upper() == name and not \
- self.identifier_preparer._requires_quotes(name.lower()):
- return name.lower()
- elif name.lower() == name:
- return quoted_name(name, quote=True)
- else:
- return name
- def denormalize_name(self, name):
- if name is None:
- return None
- elif name.lower() == name and not \
- self.identifier_preparer._requires_quotes(name.lower()):
- name = name.upper()
- if util.py2k:
- if not self.supports_unicode_binds:
- name = name.encode(self.encoding)
- else:
- name = unicode(name)
- return name
- def _get_default_schema_name(self, connection):
- return self.normalize_name(
- connection.execute('SELECT USER FROM DUAL').scalar())
- def _resolve_synonym(self, connection, desired_owner=None,
- desired_synonym=None, desired_table=None):
- """search for a local synonym matching the given desired owner/name.
- if desired_owner is None, attempts to locate a distinct owner.
- returns the actual name, owner, dblink name, and synonym name if
- found.
- """
- q = "SELECT owner, table_owner, table_name, db_link, "\
- "synonym_name FROM all_synonyms WHERE "
- clauses = []
- params = {}
- if desired_synonym:
- clauses.append("synonym_name = :synonym_name")
- params['synonym_name'] = desired_synonym
- if desired_owner:
- clauses.append("owner = :desired_owner")
- params['desired_owner'] = desired_owner
- if desired_table:
- clauses.append("table_name = :tname")
- params['tname'] = desired_table
- q += " AND ".join(clauses)
- result = connection.execute(sql.text(q), **params)
- if desired_owner:
- row = result.first()
- if row:
- return (row['table_name'], row['table_owner'],
- row['db_link'], row['synonym_name'])
- else:
- return None, None, None, None
- else:
- rows = result.fetchall()
- if len(rows) > 1:
- raise AssertionError(
- "There are multiple tables visible to the schema, you "
- "must specify owner")
- elif len(rows) == 1:
- row = rows[0]
- return (row['table_name'], row['table_owner'],
- row['db_link'], row['synonym_name'])
- else:
- return None, None, None, None
- @reflection.cache
- def _prepare_reflection_args(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
- if resolve_synonyms:
- actual_name, owner, dblink, synonym = self._resolve_synonym(
- connection,
- desired_owner=self.denormalize_name(schema),
- desired_synonym=self.denormalize_name(table_name)
- )
- else:
- actual_name, owner, dblink, synonym = None, None, None, None
- if not actual_name:
- actual_name = self.denormalize_name(table_name)
- if dblink:
- # using user_db_links here since all_db_links appears
- # to have more restricted permissions.
- # http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
- # will need to hear from more users if we are doing
- # the right thing here. See [ticket:2619]
- owner = connection.scalar(
- sql.text("SELECT username FROM user_db_links "
- "WHERE db_link=:link"), link=dblink)
- dblink = "@" + dblink
- elif not owner:
- owner = self.denormalize_name(schema or self.default_schema_name)
- return (actual_name, owner, dblink or '', synonym)
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- s = "SELECT username FROM all_users ORDER BY username"
- cursor = connection.execute(s,)
- return [self.normalize_name(row[0]) for row in cursor]
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- schema = self.denormalize_name(schema or self.default_schema_name)
- # note that table_names() isn't loading DBLINKed or synonym'ed tables
- if schema is None:
- schema = self.default_schema_name
- sql_str = "SELECT table_name FROM all_tables WHERE "
- if self.exclude_tablespaces:
- sql_str += (
- "nvl(tablespace_name, 'no tablespace') "
- "NOT IN (%s) AND " % (
- ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
- )
- )
- sql_str += (
- "OWNER = :owner "
- "AND IOT_NAME IS NULL "
- "AND DURATION IS NULL")
- cursor = connection.execute(sql.text(sql_str), owner=schema)
- return [self.normalize_name(row[0]) for row in cursor]
- @reflection.cache
- def get_temp_table_names(self, connection, **kw):
- schema = self.denormalize_name(self.default_schema_name)
- sql_str = "SELECT table_name FROM all_tables WHERE "
- if self.exclude_tablespaces:
- sql_str += (
- "nvl(tablespace_name, 'no tablespace') "
- "NOT IN (%s) AND " % (
- ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
- )
- )
- sql_str += (
- "OWNER = :owner "
- "AND IOT_NAME IS NULL "
- "AND DURATION IS NOT NULL")
- cursor = connection.execute(sql.text(sql_str), owner=schema)
- return [self.normalize_name(row[0]) for row in cursor]
- @reflection.cache
- def get_view_names(self, connection, schema=None, **kw):
- schema = self.denormalize_name(schema or self.default_schema_name)
- s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
- cursor = connection.execute(s, owner=self.denormalize_name(schema))
- return [self.normalize_name(row[0]) for row in cursor]
- @reflection.cache
- def get_table_options(self, connection, table_name, schema=None, **kw):
- options = {}
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- params = {"table_name": table_name}
- columns = ["table_name"]
- if self._supports_table_compression:
- columns.append("compression")
- if self._supports_table_compress_for:
- columns.append("compress_for")
- text = "SELECT %(columns)s "\
- "FROM ALL_TABLES%(dblink)s "\
- "WHERE table_name = :table_name"
- if schema is not None:
- params['owner'] = schema
- text += " AND owner = :owner "
- text = text % {'dblink': dblink, 'columns': ", ".join(columns)}
- result = connection.execute(sql.text(text), **params)
- enabled = dict(DISABLED=False, ENABLED=True)
- row = result.first()
- if row:
- if "compression" in row and enabled.get(row.compression, False):
- if "compress_for" in row:
- options['oracle_compress'] = row.compress_for
- else:
- options['oracle_compress'] = True
- return options
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- """
- kw arguments can be:
- oracle_resolve_synonyms
- dblink
- """
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- columns = []
- if self._supports_char_length:
- char_length_col = 'char_length'
- else:
- char_length_col = 'data_length'
- params = {"table_name": table_name}
- text = "SELECT column_name, data_type, %(char_length_col)s, "\
- "data_precision, data_scale, "\
- "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
- "WHERE table_name = :table_name"
- if schema is not None:
- params['owner'] = schema
- text += " AND owner = :owner "
- text += " ORDER BY column_id"
- text = text % {'dblink': dblink, 'char_length_col': char_length_col}
- c = connection.execute(sql.text(text), **params)
- for row in c:
- (colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
- (self.normalize_name(row[0]), row[0], row[1], row[
- 2], row[3], row[4], row[5] == 'Y', row[6])
- if coltype == 'NUMBER':
- coltype = NUMBER(precision, scale)
- elif coltype in ('VARCHAR2', 'NVARCHAR2', 'CHAR'):
- coltype = self.ischema_names.get(coltype)(length)
- elif 'WITH TIME ZONE' in coltype:
- coltype = TIMESTAMP(timezone=True)
- else:
- coltype = re.sub(r'\(\d+\)', '', coltype)
- try:
- coltype = self.ischema_names[coltype]
- except KeyError:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (coltype, colname))
- coltype = sqltypes.NULLTYPE
- cdict = {
- 'name': colname,
- 'type': coltype,
- 'nullable': nullable,
- 'default': default,
- 'autoincrement': 'auto',
- }
- if orig_colname.lower() == orig_colname:
- cdict['quote'] = True
- columns.append(cdict)
- return columns
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- indexes = []
- params = {'table_name': table_name}
- text = \
- "SELECT a.index_name, a.column_name, "\
- "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "\
- "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
- "\nALL_INDEXES%(dblink)s b "\
- "\nWHERE "\
- "\na.index_name = b.index_name "\
- "\nAND a.table_owner = b.table_owner "\
- "\nAND a.table_name = b.table_name "\
- "\nAND a.table_name = :table_name "
- if schema is not None:
- params['schema'] = schema
- text += "AND a.table_owner = :schema "
- text += "ORDER BY a.index_name, a.column_position"
- text = text % {'dblink': dblink}
- q = sql.text(text)
- rp = connection.execute(q, **params)
- indexes = []
- last_index_name = None
- pk_constraint = self.get_pk_constraint(
- connection, table_name, schema, resolve_synonyms=resolve_synonyms,
- dblink=dblink, info_cache=kw.get('info_cache'))
- pkeys = pk_constraint['constrained_columns']
- uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
- enabled = dict(DISABLED=False, ENABLED=True)
- oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
- def upper_name_set(names):
- return set([i.upper() for i in names])
- pk_names = upper_name_set(pkeys)
- def remove_if_primary_key(index):
- # don't include the primary key index
- if index is not None and \
- upper_name_set(index['column_names']) == pk_names:
- indexes.pop()
- index = None
- for rset in rp:
- if rset.index_name != last_index_name:
- remove_if_primary_key(index)
- index = dict(name=self.normalize_name(rset.index_name),
- column_names=[], dialect_options={})
- indexes.append(index)
- index['unique'] = uniqueness.get(rset.uniqueness, False)
- if rset.index_type in ('BITMAP', 'FUNCTION-BASED BITMAP'):
- index['dialect_options']['oracle_bitmap'] = True
- if enabled.get(rset.compression, False):
- index['dialect_options']['oracle_compress'] = rset.prefix_length
- # filter out Oracle SYS_NC names. could also do an outer join
- # to the all_tab_columns table and check for real col names there.
- if not oracle_sys_col.match(rset.column_name):
- index['column_names'].append(
- self.normalize_name(rset.column_name))
- last_index_name = rset.index_name
- remove_if_primary_key(index)
- return indexes
- @reflection.cache
- def _get_constraint_data(self, connection, table_name, schema=None,
- dblink='', **kw):
- params = {'table_name': table_name}
- text = \
- "SELECT"\
- "\nac.constraint_name,"\
- "\nac.constraint_type,"\
- "\nloc.column_name AS local_column,"\
- "\nrem.table_name AS remote_table,"\
- "\nrem.column_name AS remote_column,"\
- "\nrem.owner AS remote_owner,"\
- "\nloc.position as loc_pos,"\
- "\nrem.position as rem_pos"\
- "\nFROM all_constraints%(dblink)s ac,"\
- "\nall_cons_columns%(dblink)s loc,"\
- "\nall_cons_columns%(dblink)s rem"\
- "\nWHERE ac.table_name = :table_name"\
- "\nAND ac.constraint_type IN ('R','P')"
- if schema is not None:
- params['owner'] = schema
- text += "\nAND ac.owner = :owner"
- text += \
- "\nAND ac.owner = loc.owner"\
- "\nAND ac.constraint_name = loc.constraint_name"\
- "\nAND ac.r_owner = rem.owner(+)"\
- "\nAND ac.r_constraint_name = rem.constraint_name(+)"\
- "\nAND (rem.position IS NULL or loc.position=rem.position)"\
- "\nORDER BY ac.constraint_name, loc.position"
- text = text % {'dblink': dblink}
- rp = connection.execute(sql.text(text), **params)
- constraint_data = rp.fetchall()
- return constraint_data
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- pkeys = []
- constraint_name = None
- constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
- for row in constraint_data:
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
- row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
- if cons_type == 'P':
- if constraint_name is None:
- constraint_name = self.normalize_name(cons_name)
- pkeys.append(local_column)
- return {'constrained_columns': pkeys, 'name': constraint_name}
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- """
- kw arguments can be:
- oracle_resolve_synonyms
- dblink
- """
- requested_schema = schema # to check later on
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
- def fkey_rec():
- return {
- 'name': None,
- 'constrained_columns': [],
- 'referred_schema': None,
- 'referred_table': None,
- 'referred_columns': []
- }
- fkeys = util.defaultdict(fkey_rec)
- for row in constraint_data:
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
- row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
- if cons_type == 'R':
- if remote_table is None:
- # ticket 363
- util.warn(
- ("Got 'None' querying 'table_name' from "
- "all_cons_columns%(dblink)s - does the user have "
- "proper rights to the table?") % {'dblink': dblink})
- continue
- rec = fkeys[cons_name]
- rec['name'] = cons_name
- local_cols, remote_cols = rec[
- 'constrained_columns'], rec['referred_columns']
- if not rec['referred_table']:
- if resolve_synonyms:
- ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = \
- self._resolve_synonym(
- connection,
- desired_owner=self.denormalize_name(
- remote_owner),
- desired_table=self.denormalize_name(
- remote_table)
- )
- if ref_synonym:
- remote_table = self.normalize_name(ref_synonym)
- remote_owner = self.normalize_name(
- ref_remote_owner)
- rec['referred_table'] = remote_table
- if requested_schema is not None or \
- self.denormalize_name(remote_owner) != schema:
- rec['referred_schema'] = remote_owner
- local_cols.append(local_column)
- remote_cols.append(remote_column)
- return list(fkeys.values())
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
- info_cache = kw.get('info_cache')
- (view_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, view_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- params = {'view_name': view_name}
- text = "SELECT text FROM all_views WHERE view_name=:view_name"
- if schema is not None:
- text += " AND owner = :schema"
- params['schema'] = schema
- rp = connection.execute(sql.text(text), **params).scalar()
- if rp:
- if util.py2k:
- rp = rp.decode(self.encoding)
- return rp
- else:
- return None
- class _OuterJoinColumn(sql.ClauseElement):
- __visit_name__ = 'outer_join_column'
- def __init__(self, column):
- self.column = column
|