123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821 |
- # sybase/base.py
- # Copyright (C) 2010-2017 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- # get_select_precolumns(), limit_clause() implementation
- # copyright (C) 2007 Fisch Asset Management
- # AG http://www.fam.ch, with coding by Alexander Houben
- # alexander.houben@thor-solutions.ch
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: http://www.opensource.org/licenses/mit-license.php
- """
- .. dialect:: sybase
- :name: Sybase
- .. note::
- The Sybase dialect functions on current SQLAlchemy versions
- but is not regularly tested, and may have many issues and
- caveats not currently handled.
- """
- import operator
- import re
- from sqlalchemy.sql import compiler, expression, text, bindparam
- from sqlalchemy.engine import default, base, reflection
- from sqlalchemy import types as sqltypes
- from sqlalchemy.sql import operators as sql_operators
- from sqlalchemy import schema as sa_schema
- from sqlalchemy import util, sql, exc
- from sqlalchemy.types import CHAR, VARCHAR, TIME, NCHAR, NVARCHAR,\
- TEXT, DATE, DATETIME, FLOAT, NUMERIC,\
- BIGINT, INT, INTEGER, SMALLINT, BINARY,\
- VARBINARY, DECIMAL, TIMESTAMP, Unicode,\
- UnicodeText, REAL
- RESERVED_WORDS = set([
- "add", "all", "alter", "and",
- "any", "as", "asc", "backup",
- "begin", "between", "bigint", "binary",
- "bit", "bottom", "break", "by",
- "call", "capability", "cascade", "case",
- "cast", "char", "char_convert", "character",
- "check", "checkpoint", "close", "comment",
- "commit", "connect", "constraint", "contains",
- "continue", "convert", "create", "cross",
- "cube", "current", "current_timestamp", "current_user",
- "cursor", "date", "dbspace", "deallocate",
- "dec", "decimal", "declare", "default",
- "delete", "deleting", "desc", "distinct",
- "do", "double", "drop", "dynamic",
- "else", "elseif", "encrypted", "end",
- "endif", "escape", "except", "exception",
- "exec", "execute", "existing", "exists",
- "externlogin", "fetch", "first", "float",
- "for", "force", "foreign", "forward",
- "from", "full", "goto", "grant",
- "group", "having", "holdlock", "identified",
- "if", "in", "index", "index_lparen",
- "inner", "inout", "insensitive", "insert",
- "inserting", "install", "instead", "int",
- "integer", "integrated", "intersect", "into",
- "iq", "is", "isolation", "join",
- "key", "lateral", "left", "like",
- "lock", "login", "long", "match",
- "membership", "message", "mode", "modify",
- "natural", "new", "no", "noholdlock",
- "not", "notify", "null", "numeric",
- "of", "off", "on", "open",
- "option", "options", "or", "order",
- "others", "out", "outer", "over",
- "passthrough", "precision", "prepare", "primary",
- "print", "privileges", "proc", "procedure",
- "publication", "raiserror", "readtext", "real",
- "reference", "references", "release", "remote",
- "remove", "rename", "reorganize", "resource",
- "restore", "restrict", "return", "revoke",
- "right", "rollback", "rollup", "save",
- "savepoint", "scroll", "select", "sensitive",
- "session", "set", "setuser", "share",
- "smallint", "some", "sqlcode", "sqlstate",
- "start", "stop", "subtrans", "subtransaction",
- "synchronize", "syntax_error", "table", "temporary",
- "then", "time", "timestamp", "tinyint",
- "to", "top", "tran", "trigger",
- "truncate", "tsequal", "unbounded", "union",
- "unique", "unknown", "unsigned", "update",
- "updating", "user", "using", "validate",
- "values", "varbinary", "varchar", "variable",
- "varying", "view", "wait", "waitfor",
- "when", "where", "while", "window",
- "with", "with_cube", "with_lparen", "with_rollup",
- "within", "work", "writetext",
- ])
- class _SybaseUnitypeMixin(object):
- """these types appear to return a buffer object."""
- def result_processor(self, dialect, coltype):
- def process(value):
- if value is not None:
- return str(value) # decode("ucs-2")
- else:
- return None
- return process
- class UNICHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
- __visit_name__ = 'UNICHAR'
- class UNIVARCHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
- __visit_name__ = 'UNIVARCHAR'
- class UNITEXT(_SybaseUnitypeMixin, sqltypes.UnicodeText):
- __visit_name__ = 'UNITEXT'
- class TINYINT(sqltypes.Integer):
- __visit_name__ = 'TINYINT'
- class BIT(sqltypes.TypeEngine):
- __visit_name__ = 'BIT'
- class MONEY(sqltypes.TypeEngine):
- __visit_name__ = "MONEY"
- class SMALLMONEY(sqltypes.TypeEngine):
- __visit_name__ = "SMALLMONEY"
- class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
- __visit_name__ = "UNIQUEIDENTIFIER"
- class IMAGE(sqltypes.LargeBinary):
- __visit_name__ = 'IMAGE'
- class SybaseTypeCompiler(compiler.GenericTypeCompiler):
- def visit_large_binary(self, type_, **kw):
- return self.visit_IMAGE(type_)
- def visit_boolean(self, type_, **kw):
- return self.visit_BIT(type_)
- def visit_unicode(self, type_, **kw):
- return self.visit_NVARCHAR(type_)
- def visit_UNICHAR(self, type_, **kw):
- return "UNICHAR(%d)" % type_.length
- def visit_UNIVARCHAR(self, type_, **kw):
- return "UNIVARCHAR(%d)" % type_.length
- def visit_UNITEXT(self, type_, **kw):
- return "UNITEXT"
- def visit_TINYINT(self, type_, **kw):
- return "TINYINT"
- def visit_IMAGE(self, type_, **kw):
- return "IMAGE"
- def visit_BIT(self, type_, **kw):
- return "BIT"
- def visit_MONEY(self, type_, **kw):
- return "MONEY"
- def visit_SMALLMONEY(self, type_, **kw):
- return "SMALLMONEY"
- def visit_UNIQUEIDENTIFIER(self, type_, **kw):
- return "UNIQUEIDENTIFIER"
- ischema_names = {
- 'bigint': BIGINT,
- 'int': INTEGER,
- 'integer': INTEGER,
- 'smallint': SMALLINT,
- 'tinyint': TINYINT,
- 'unsigned bigint': BIGINT, # TODO: unsigned flags
- 'unsigned int': INTEGER, # TODO: unsigned flags
- 'unsigned smallint': SMALLINT, # TODO: unsigned flags
- 'numeric': NUMERIC,
- 'decimal': DECIMAL,
- 'dec': DECIMAL,
- 'float': FLOAT,
- 'double': NUMERIC, # TODO
- 'double precision': NUMERIC, # TODO
- 'real': REAL,
- 'smallmoney': SMALLMONEY,
- 'money': MONEY,
- 'smalldatetime': DATETIME,
- 'datetime': DATETIME,
- 'date': DATE,
- 'time': TIME,
- 'char': CHAR,
- 'character': CHAR,
- 'varchar': VARCHAR,
- 'character varying': VARCHAR,
- 'char varying': VARCHAR,
- 'unichar': UNICHAR,
- 'unicode character': UNIVARCHAR,
- 'nchar': NCHAR,
- 'national char': NCHAR,
- 'national character': NCHAR,
- 'nvarchar': NVARCHAR,
- 'nchar varying': NVARCHAR,
- 'national char varying': NVARCHAR,
- 'national character varying': NVARCHAR,
- 'text': TEXT,
- 'unitext': UNITEXT,
- 'binary': BINARY,
- 'varbinary': VARBINARY,
- 'image': IMAGE,
- 'bit': BIT,
- # not in documentation for ASE 15.7
- 'long varchar': TEXT, # TODO
- 'timestamp': TIMESTAMP,
- 'uniqueidentifier': UNIQUEIDENTIFIER,
- }
- class SybaseInspector(reflection.Inspector):
- def __init__(self, conn):
- reflection.Inspector.__init__(self, conn)
- def get_table_id(self, table_name, schema=None):
- """Return the table id from `table_name` and `schema`."""
- return self.dialect.get_table_id(self.bind, table_name, schema,
- info_cache=self.info_cache)
- class SybaseExecutionContext(default.DefaultExecutionContext):
- _enable_identity_insert = False
- def set_ddl_autocommit(self, connection, value):
- """Must be implemented by subclasses to accommodate DDL executions.
- "connection" is the raw unwrapped DBAPI connection. "value"
- is True or False. when True, the connection should be configured
- such that a DDL can take place subsequently. when False,
- a DDL has taken place and the connection should be resumed
- into non-autocommit mode.
- """
- raise NotImplementedError()
- def pre_exec(self):
- if self.isinsert:
- tbl = self.compiled.statement.table
- seq_column = tbl._autoincrement_column
- insert_has_sequence = seq_column is not None
- if insert_has_sequence:
- self._enable_identity_insert = \
- seq_column.key in self.compiled_parameters[0]
- else:
- self._enable_identity_insert = False
- if self._enable_identity_insert:
- self.cursor.execute(
- "SET IDENTITY_INSERT %s ON" %
- self.dialect.identifier_preparer.format_table(tbl))
- if self.isddl:
- # TODO: to enhance this, we can detect "ddl in tran" on the
- # database settings. this error message should be improved to
- # include a note about that.
- if not self.should_autocommit:
- raise exc.InvalidRequestError(
- "The Sybase dialect only supports "
- "DDL in 'autocommit' mode at this time.")
- self.root_connection.engine.logger.info(
- "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")
- self.set_ddl_autocommit(
- self.root_connection.connection.connection,
- True)
- def post_exec(self):
- if self.isddl:
- self.set_ddl_autocommit(self.root_connection, False)
- if self._enable_identity_insert:
- self.cursor.execute(
- "SET IDENTITY_INSERT %s OFF" %
- self.dialect.identifier_preparer.
- format_table(self.compiled.statement.table)
- )
- def get_lastrowid(self):
- cursor = self.create_cursor()
- cursor.execute("SELECT @@identity AS lastrowid")
- lastrowid = cursor.fetchone()[0]
- cursor.close()
- return lastrowid
- class SybaseSQLCompiler(compiler.SQLCompiler):
- ansi_bind_rules = True
- extract_map = util.update_copy(
- compiler.SQLCompiler.extract_map,
- {
- 'doy': 'dayofyear',
- 'dow': 'weekday',
- 'milliseconds': 'millisecond'
- })
- def get_select_precolumns(self, select, **kw):
- s = select._distinct and "DISTINCT " or ""
- # TODO: don't think Sybase supports
- # bind params for FIRST / TOP
- limit = select._limit
- if limit:
- # if select._limit == 1:
- # s += "FIRST "
- # else:
- # s += "TOP %s " % (select._limit,)
- s += "TOP %s " % (limit,)
- offset = select._offset
- if offset:
- raise NotImplementedError("Sybase ASE does not support OFFSET")
- return s
- def get_from_hint_text(self, table, text):
- return text
- def limit_clause(self, select, **kw):
- # Limit in sybase is after the select keyword
- return ""
- def visit_extract(self, extract, **kw):
- field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (
- field, self.process(extract.expr, **kw))
- def visit_now_func(self, fn, **kw):
- return "GETDATE()"
- def for_update_clause(self, select):
- # "FOR UPDATE" is only allowed on "DECLARE CURSOR"
- # which SQLAlchemy doesn't use
- return ''
- def order_by_clause(self, select, **kw):
- kw['literal_binds'] = True
- order_by = self.process(select._order_by_clause, **kw)
- # SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT
- if order_by and (not self.is_subquery() or select._limit):
- return " ORDER BY " + order_by
- else:
- return ""
- class SybaseDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column) + " " + \
- self.dialect.type_compiler.process(
- column.type, type_expression=column)
- if column.table is None:
- raise exc.CompileError(
- "The Sybase dialect requires Table-bound "
- "columns in order to generate DDL")
- seq_col = column.table._autoincrement_column
- # install a IDENTITY Sequence if we have an implicit IDENTITY column
- if seq_col is column:
- sequence = isinstance(column.default, sa_schema.Sequence) \
- and column.default
- if sequence:
- start, increment = sequence.start or 1, \
- sequence.increment or 1
- else:
- start, increment = 1, 1
- if (start, increment) == (1, 1):
- colspec += " IDENTITY"
- else:
- # TODO: need correct syntax for this
- colspec += " IDENTITY(%s,%s)" % (start, increment)
- else:
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
- if column.nullable is not None:
- if not column.nullable or column.primary_key:
- colspec += " NOT NULL"
- else:
- colspec += " NULL"
- return colspec
- def visit_drop_index(self, drop):
- index = drop.element
- return "\nDROP INDEX %s.%s" % (
- self.preparer.quote_identifier(index.table.name),
- self._prepared_index_name(drop.element,
- include_schema=False)
- )
- class SybaseIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
- class SybaseDialect(default.DefaultDialect):
- name = 'sybase'
- supports_unicode_statements = False
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
- supports_native_boolean = False
- supports_unicode_binds = False
- postfetch_lastrowid = True
- colspecs = {}
- ischema_names = ischema_names
- type_compiler = SybaseTypeCompiler
- statement_compiler = SybaseSQLCompiler
- ddl_compiler = SybaseDDLCompiler
- preparer = SybaseIdentifierPreparer
- inspector = SybaseInspector
- construct_arguments = []
- def _get_default_schema_name(self, connection):
- return connection.scalar(
- text("SELECT user_name() as user_name",
- typemap={'user_name': Unicode})
- )
- def initialize(self, connection):
- super(SybaseDialect, self).initialize(connection)
- if self.server_version_info is not None and\
- self.server_version_info < (15, ):
- self.max_identifier_length = 30
- else:
- self.max_identifier_length = 255
- def get_table_id(self, connection, table_name, schema=None, **kw):
- """Fetch the id for schema.table_name.
- Several reflection methods require the table id. The idea for using
- this method is that it can be fetched one time and cached for
- subsequent calls.
- """
- table_id = None
- if schema is None:
- schema = self.default_schema_name
- TABLEID_SQL = text("""
- SELECT o.id AS id
- FROM sysobjects o JOIN sysusers u ON o.uid=u.uid
- WHERE u.name = :schema_name
- AND o.name = :table_name
- AND o.type in ('U', 'V')
- """)
- if util.py2k:
- if isinstance(schema, unicode):
- schema = schema.encode("ascii")
- if isinstance(table_name, unicode):
- table_name = table_name.encode("ascii")
- result = connection.execute(TABLEID_SQL,
- schema_name=schema,
- table_name=table_name)
- table_id = result.scalar()
- if table_id is None:
- raise exc.NoSuchTableError(table_name)
- return table_id
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(connection, table_name, schema,
- info_cache=kw.get("info_cache"))
- COLUMN_SQL = text("""
- SELECT col.name AS name,
- t.name AS type,
- (col.status & 8) AS nullable,
- (col.status & 128) AS autoincrement,
- com.text AS 'default',
- col.prec AS precision,
- col.scale AS scale,
- col.length AS length
- FROM systypes t, syscolumns col LEFT OUTER JOIN syscomments com ON
- col.cdefault = com.id
- WHERE col.usertype = t.usertype
- AND col.id = :table_id
- ORDER BY col.colid
- """)
- results = connection.execute(COLUMN_SQL, table_id=table_id)
- columns = []
- for (name, type_, nullable, autoincrement, default, precision, scale,
- length) in results:
- col_info = self._get_column_info(name, type_, bool(nullable),
- bool(autoincrement),
- default, precision, scale,
- length)
- columns.append(col_info)
- return columns
- def _get_column_info(self, name, type_, nullable, autoincrement, default,
- precision, scale, length):
- coltype = self.ischema_names.get(type_, None)
- kwargs = {}
- if coltype in (NUMERIC, DECIMAL):
- args = (precision, scale)
- elif coltype == FLOAT:
- args = (precision,)
- elif coltype in (CHAR, VARCHAR, UNICHAR, UNIVARCHAR, NCHAR, NVARCHAR):
- args = (length,)
- else:
- args = ()
- if coltype:
- coltype = coltype(*args, **kwargs)
- # is this necessary
- # if is_array:
- # coltype = ARRAY(coltype)
- else:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (type_, name))
- coltype = sqltypes.NULLTYPE
- if default:
- default = default.replace("DEFAULT", "").strip()
- default = re.sub("^'(.*)'$", lambda m: m.group(1), default)
- else:
- default = None
- column_info = dict(name=name, type=coltype, nullable=nullable,
- default=default, autoincrement=autoincrement)
- return column_info
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(connection, table_name, schema,
- info_cache=kw.get("info_cache"))
- table_cache = {}
- column_cache = {}
- foreign_keys = []
- table_cache[table_id] = {"name": table_name, "schema": schema}
- COLUMN_SQL = text("""
- SELECT c.colid AS id, c.name AS name
- FROM syscolumns c
- WHERE c.id = :table_id
- """)
- results = connection.execute(COLUMN_SQL, table_id=table_id)
- columns = {}
- for col in results:
- columns[col["id"]] = col["name"]
- column_cache[table_id] = columns
- REFCONSTRAINT_SQL = text("""
- SELECT o.name AS name, r.reftabid AS reftable_id,
- r.keycnt AS 'count',
- r.fokey1 AS fokey1, r.fokey2 AS fokey2, r.fokey3 AS fokey3,
- r.fokey4 AS fokey4, r.fokey5 AS fokey5, r.fokey6 AS fokey6,
- r.fokey7 AS fokey7, r.fokey1 AS fokey8, r.fokey9 AS fokey9,
- r.fokey10 AS fokey10, r.fokey11 AS fokey11, r.fokey12 AS fokey12,
- r.fokey13 AS fokey13, r.fokey14 AS fokey14, r.fokey15 AS fokey15,
- r.fokey16 AS fokey16,
- r.refkey1 AS refkey1, r.refkey2 AS refkey2, r.refkey3 AS refkey3,
- r.refkey4 AS refkey4, r.refkey5 AS refkey5, r.refkey6 AS refkey6,
- r.refkey7 AS refkey7, r.refkey1 AS refkey8, r.refkey9 AS refkey9,
- r.refkey10 AS refkey10, r.refkey11 AS refkey11,
- r.refkey12 AS refkey12, r.refkey13 AS refkey13,
- r.refkey14 AS refkey14, r.refkey15 AS refkey15,
- r.refkey16 AS refkey16
- FROM sysreferences r JOIN sysobjects o on r.tableid = o.id
- WHERE r.tableid = :table_id
- """)
- referential_constraints = connection.execute(
- REFCONSTRAINT_SQL, table_id=table_id).fetchall()
- REFTABLE_SQL = text("""
- SELECT o.name AS name, u.name AS 'schema'
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE o.id = :table_id
- """)
- for r in referential_constraints:
- reftable_id = r["reftable_id"]
- if reftable_id not in table_cache:
- c = connection.execute(REFTABLE_SQL, table_id=reftable_id)
- reftable = c.fetchone()
- c.close()
- table_info = {"name": reftable["name"], "schema": None}
- if (schema is not None or
- reftable["schema"] != self.default_schema_name):
- table_info["schema"] = reftable["schema"]
- table_cache[reftable_id] = table_info
- results = connection.execute(COLUMN_SQL, table_id=reftable_id)
- reftable_columns = {}
- for col in results:
- reftable_columns[col["id"]] = col["name"]
- column_cache[reftable_id] = reftable_columns
- reftable = table_cache[reftable_id]
- reftable_columns = column_cache[reftable_id]
- constrained_columns = []
- referred_columns = []
- for i in range(1, r["count"] + 1):
- constrained_columns.append(columns[r["fokey%i" % i]])
- referred_columns.append(reftable_columns[r["refkey%i" % i]])
- fk_info = {
- "constrained_columns": constrained_columns,
- "referred_schema": reftable["schema"],
- "referred_table": reftable["name"],
- "referred_columns": referred_columns,
- "name": r["name"]
- }
- foreign_keys.append(fk_info)
- return foreign_keys
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(connection, table_name, schema,
- info_cache=kw.get("info_cache"))
- INDEX_SQL = text("""
- SELECT object_name(i.id) AS table_name,
- i.keycnt AS 'count',
- i.name AS name,
- (i.status & 0x2) AS 'unique',
- index_col(object_name(i.id), i.indid, 1) AS col_1,
- index_col(object_name(i.id), i.indid, 2) AS col_2,
- index_col(object_name(i.id), i.indid, 3) AS col_3,
- index_col(object_name(i.id), i.indid, 4) AS col_4,
- index_col(object_name(i.id), i.indid, 5) AS col_5,
- index_col(object_name(i.id), i.indid, 6) AS col_6,
- index_col(object_name(i.id), i.indid, 7) AS col_7,
- index_col(object_name(i.id), i.indid, 8) AS col_8,
- index_col(object_name(i.id), i.indid, 9) AS col_9,
- index_col(object_name(i.id), i.indid, 10) AS col_10,
- index_col(object_name(i.id), i.indid, 11) AS col_11,
- index_col(object_name(i.id), i.indid, 12) AS col_12,
- index_col(object_name(i.id), i.indid, 13) AS col_13,
- index_col(object_name(i.id), i.indid, 14) AS col_14,
- index_col(object_name(i.id), i.indid, 15) AS col_15,
- index_col(object_name(i.id), i.indid, 16) AS col_16
- FROM sysindexes i, sysobjects o
- WHERE o.id = i.id
- AND o.id = :table_id
- AND (i.status & 2048) = 0
- AND i.indid BETWEEN 1 AND 254
- """)
- results = connection.execute(INDEX_SQL, table_id=table_id)
- indexes = []
- for r in results:
- column_names = []
- for i in range(1, r["count"]):
- column_names.append(r["col_%i" % (i,)])
- index_info = {"name": r["name"],
- "unique": bool(r["unique"]),
- "column_names": column_names}
- indexes.append(index_info)
- return indexes
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(connection, table_name, schema,
- info_cache=kw.get("info_cache"))
- PK_SQL = text("""
- SELECT object_name(i.id) AS table_name,
- i.keycnt AS 'count',
- i.name AS name,
- index_col(object_name(i.id), i.indid, 1) AS pk_1,
- index_col(object_name(i.id), i.indid, 2) AS pk_2,
- index_col(object_name(i.id), i.indid, 3) AS pk_3,
- index_col(object_name(i.id), i.indid, 4) AS pk_4,
- index_col(object_name(i.id), i.indid, 5) AS pk_5,
- index_col(object_name(i.id), i.indid, 6) AS pk_6,
- index_col(object_name(i.id), i.indid, 7) AS pk_7,
- index_col(object_name(i.id), i.indid, 8) AS pk_8,
- index_col(object_name(i.id), i.indid, 9) AS pk_9,
- index_col(object_name(i.id), i.indid, 10) AS pk_10,
- index_col(object_name(i.id), i.indid, 11) AS pk_11,
- index_col(object_name(i.id), i.indid, 12) AS pk_12,
- index_col(object_name(i.id), i.indid, 13) AS pk_13,
- index_col(object_name(i.id), i.indid, 14) AS pk_14,
- index_col(object_name(i.id), i.indid, 15) AS pk_15,
- index_col(object_name(i.id), i.indid, 16) AS pk_16
- FROM sysindexes i, sysobjects o
- WHERE o.id = i.id
- AND o.id = :table_id
- AND (i.status & 2048) = 2048
- AND i.indid BETWEEN 1 AND 254
- """)
- results = connection.execute(PK_SQL, table_id=table_id)
- pks = results.fetchone()
- results.close()
- constrained_columns = []
- if pks:
- for i in range(1, pks["count"] + 1):
- constrained_columns.append(pks["pk_%i" % (i,)])
- return {"constrained_columns": constrained_columns,
- "name": pks["name"]}
- else:
- return {"constrained_columns": [], "name": None}
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- SCHEMA_SQL = text("SELECT u.name AS name FROM sysusers u")
- schemas = connection.execute(SCHEMA_SQL)
- return [s["name"] for s in schemas]
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
- TABLE_SQL = text("""
- SELECT o.name AS name
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE u.name = :schema_name
- AND o.type = 'U'
- """)
- if util.py2k:
- if isinstance(schema, unicode):
- schema = schema.encode("ascii")
- tables = connection.execute(TABLE_SQL, schema_name=schema)
- return [t["name"] for t in tables]
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
- VIEW_DEF_SQL = text("""
- SELECT c.text
- FROM syscomments c JOIN sysobjects o ON c.id = o.id
- WHERE o.name = :view_name
- AND o.type = 'V'
- """)
- if util.py2k:
- if isinstance(view_name, unicode):
- view_name = view_name.encode("ascii")
- view = connection.execute(VIEW_DEF_SQL, view_name=view_name)
- return view.scalar()
- @reflection.cache
- def get_view_names(self, connection, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
- VIEW_SQL = text("""
- SELECT o.name AS name
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE u.name = :schema_name
- AND o.type = 'V'
- """)
- if util.py2k:
- if isinstance(schema, unicode):
- schema = schema.encode("ascii")
- views = connection.execute(VIEW_SQL, schema_name=schema)
- return [v["name"] for v in views]
- def has_table(self, connection, table_name, schema=None):
- try:
- self.get_table_id(connection, table_name, schema)
- except exc.NoSuchTableError:
- return False
- else:
- return True
|