123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746 |
- import sqlalchemy as sa
- from sqlalchemy import exc as sa_exc
- from sqlalchemy import types as sql_types
- from sqlalchemy import inspect
- from sqlalchemy import MetaData, Integer, String
- from sqlalchemy.engine.reflection import Inspector
- from sqlalchemy.testing import engines, fixtures
- from sqlalchemy.testing.schema import Table, Column
- from sqlalchemy.testing import eq_, assert_raises_message
- from sqlalchemy import testing
- from .. import config
- import operator
- from sqlalchemy.schema import DDL, Index
- from sqlalchemy import event
- from sqlalchemy.sql.elements import quoted_name
- from sqlalchemy import ForeignKey
- metadata, users = None, None
- class HasTableTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table('test_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
- def test_has_table(self):
- with config.db.begin() as conn:
- assert config.db.dialect.has_table(conn, "test_table")
- assert not config.db.dialect.has_table(conn, "nonexistent_table")
- class ComponentReflectionTest(fixtures.TablesTest):
- run_inserts = run_deletes = None
- __backend__ = True
- @classmethod
- def setup_bind(cls):
- if config.requirements.independent_connections.enabled:
- from sqlalchemy import pool
- return engines.testing_engine(
- options=dict(poolclass=pool.StaticPool))
- else:
- return config.db
- @classmethod
- def define_tables(cls, metadata):
- cls.define_reflected_tables(metadata, None)
- if testing.requires.schemas.enabled:
- cls.define_reflected_tables(metadata, testing.config.test_schema)
- @classmethod
- def define_reflected_tables(cls, metadata, schema):
- if schema:
- schema_prefix = schema + "."
- else:
- schema_prefix = ""
- if testing.requires.self_referential_foreign_keys.enabled:
- users = Table('users', metadata,
- Column('user_id', sa.INT, primary_key=True),
- Column('test1', sa.CHAR(5), nullable=False),
- Column('test2', sa.Float(5), nullable=False),
- Column('parent_user_id', sa.Integer,
- sa.ForeignKey('%susers.user_id' %
- schema_prefix)),
- schema=schema,
- test_needs_fk=True,
- )
- else:
- users = Table('users', metadata,
- Column('user_id', sa.INT, primary_key=True),
- Column('test1', sa.CHAR(5), nullable=False),
- Column('test2', sa.Float(5), nullable=False),
- schema=schema,
- test_needs_fk=True,
- )
- Table("dingalings", metadata,
- Column('dingaling_id', sa.Integer, primary_key=True),
- Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' %
- schema_prefix)),
- Column('data', sa.String(30)),
- schema=schema,
- test_needs_fk=True,
- )
- Table('email_addresses', metadata,
- Column('address_id', sa.Integer),
- Column('remote_user_id', sa.Integer,
- sa.ForeignKey(users.c.user_id)),
- Column('email_address', sa.String(20)),
- sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
- schema=schema,
- test_needs_fk=True,
- )
- if testing.requires.index_reflection.enabled:
- cls.define_index(metadata, users)
- if testing.requires.view_column_reflection.enabled:
- cls.define_views(metadata, schema)
- if not schema and testing.requires.temp_table_reflection.enabled:
- cls.define_temp_tables(metadata)
- @classmethod
- def define_temp_tables(cls, metadata):
- # cheat a bit, we should fix this with some dialect-level
- # temp table fixture
- if testing.against("oracle"):
- kw = {
- 'prefixes': ["GLOBAL TEMPORARY"],
- 'oracle_on_commit': 'PRESERVE ROWS'
- }
- else:
- kw = {
- 'prefixes': ["TEMPORARY"],
- }
- user_tmp = Table(
- "user_tmp", metadata,
- Column("id", sa.INT, primary_key=True),
- Column('name', sa.VARCHAR(50)),
- Column('foo', sa.INT),
- sa.UniqueConstraint('name', name='user_tmp_uq'),
- sa.Index("user_tmp_ix", "foo"),
- **kw
- )
- if testing.requires.view_reflection.enabled and \
- testing.requires.temporary_views.enabled:
- event.listen(
- user_tmp, "after_create",
- DDL("create temporary view user_tmp_v as "
- "select * from user_tmp")
- )
- event.listen(
- user_tmp, "before_drop",
- DDL("drop view user_tmp_v")
- )
- @classmethod
- def define_index(cls, metadata, users):
- Index("users_t_idx", users.c.test1, users.c.test2)
- Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
- @classmethod
- def define_views(cls, metadata, schema):
- for table_name in ('users', 'email_addresses'):
- fullname = table_name
- if schema:
- fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + '_v'
- query = "CREATE VIEW %s AS SELECT * FROM %s" % (
- view_name, fullname)
- event.listen(
- metadata,
- "after_create",
- DDL(query)
- )
- event.listen(
- metadata,
- "before_drop",
- DDL("DROP VIEW %s" % view_name)
- )
- @testing.requires.schema_reflection
- def test_get_schema_names(self):
- insp = inspect(testing.db)
- self.assert_(testing.config.test_schema in insp.get_schema_names())
- @testing.requires.schema_reflection
- def test_dialect_initialize(self):
- engine = engines.testing_engine()
- assert not hasattr(engine.dialect, 'default_schema_name')
- inspect(engine)
- assert hasattr(engine.dialect, 'default_schema_name')
- @testing.requires.schema_reflection
- def test_get_default_schema_name(self):
- insp = inspect(testing.db)
- eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
- @testing.provide_metadata
- def _test_get_table_names(self, schema=None, table_type='table',
- order_by=None):
- meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- insp = inspect(meta.bind)
- if table_type == 'view':
- table_names = insp.get_view_names(schema)
- table_names.sort()
- answer = ['email_addresses_v', 'users_v']
- eq_(sorted(table_names), answer)
- else:
- table_names = insp.get_table_names(schema,
- order_by=order_by)
- if order_by == 'foreign_key':
- answer = ['users', 'email_addresses', 'dingalings']
- eq_(table_names, answer)
- else:
- answer = ['dingalings', 'email_addresses', 'users']
- eq_(sorted(table_names), answer)
- @testing.requires.temp_table_names
- def test_get_temp_table_names(self):
- insp = inspect(self.bind)
- temp_table_names = insp.get_temp_table_names()
- eq_(sorted(temp_table_names), ['user_tmp'])
- @testing.requires.view_reflection
- @testing.requires.temp_table_names
- @testing.requires.temporary_views
- def test_get_temp_view_names(self):
- insp = inspect(self.bind)
- temp_table_names = insp.get_temp_view_names()
- eq_(sorted(temp_table_names), ['user_tmp_v'])
- @testing.requires.table_reflection
- def test_get_table_names(self):
- self._test_get_table_names()
- @testing.requires.table_reflection
- @testing.requires.foreign_key_constraint_reflection
- def test_get_table_names_fks(self):
- self._test_get_table_names(order_by='foreign_key')
- @testing.requires.table_reflection
- @testing.requires.schemas
- def test_get_table_names_with_schema(self):
- self._test_get_table_names(testing.config.test_schema)
- @testing.requires.view_column_reflection
- def test_get_view_names(self):
- self._test_get_table_names(table_type='view')
- @testing.requires.view_column_reflection
- @testing.requires.schemas
- def test_get_view_names_with_schema(self):
- self._test_get_table_names(
- testing.config.test_schema, table_type='view')
- @testing.requires.table_reflection
- @testing.requires.view_column_reflection
- def test_get_tables_and_views(self):
- self._test_get_table_names()
- self._test_get_table_names(table_type='view')
- def _test_get_columns(self, schema=None, table_type='table'):
- meta = MetaData(testing.db)
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- table_names = ['users', 'email_addresses']
- if table_type == 'view':
- table_names = ['users_v', 'email_addresses_v']
- insp = inspect(meta.bind)
- for table_name, table in zip(table_names, (users,
- addresses)):
- schema_name = schema
- cols = insp.get_columns(table_name, schema=schema_name)
- self.assert_(len(cols) > 0, len(cols))
- # should be in order
- for i, col in enumerate(table.columns):
- eq_(col.name, cols[i]['name'])
- ctype = cols[i]['type'].__class__
- ctype_def = col.type
- if isinstance(ctype_def, sa.types.TypeEngine):
- ctype_def = ctype_def.__class__
- # Oracle returns Date for DateTime.
- if testing.against('oracle') and ctype_def \
- in (sql_types.Date, sql_types.DateTime):
- ctype_def = sql_types.Date
- # assert that the desired type and return type share
- # a base within one of the generic types.
- self.assert_(len(set(ctype.__mro__).
- intersection(ctype_def.__mro__).
- intersection([
- sql_types.Integer,
- sql_types.Numeric,
- sql_types.DateTime,
- sql_types.Date,
- sql_types.Time,
- sql_types.String,
- sql_types._Binary,
- ])) > 0, '%s(%s), %s(%s)' %
- (col.name, col.type, cols[i]['name'], ctype))
- if not col.primary_key:
- assert cols[i]['default'] is None
- @testing.requires.table_reflection
- def test_get_columns(self):
- self._test_get_columns()
- @testing.provide_metadata
- def _type_round_trip(self, *types):
- t = Table('t', self.metadata,
- *[
- Column('t%d' % i, type_)
- for i, type_ in enumerate(types)
- ]
- )
- t.create()
- return [
- c['type'] for c in
- inspect(self.metadata.bind).get_columns('t')
- ]
- @testing.requires.table_reflection
- def test_numeric_reflection(self):
- for typ in self._type_round_trip(
- sql_types.Numeric(18, 5),
- ):
- assert isinstance(typ, sql_types.Numeric)
- eq_(typ.precision, 18)
- eq_(typ.scale, 5)
- @testing.requires.table_reflection
- def test_varchar_reflection(self):
- typ = self._type_round_trip(sql_types.String(52))[0]
- assert isinstance(typ, sql_types.String)
- eq_(typ.length, 52)
- @testing.requires.table_reflection
- @testing.provide_metadata
- def test_nullable_reflection(self):
- t = Table('t', self.metadata,
- Column('a', Integer, nullable=True),
- Column('b', Integer, nullable=False))
- t.create()
- eq_(
- dict(
- (col['name'], col['nullable'])
- for col in inspect(self.metadata.bind).get_columns('t')
- ),
- {"a": True, "b": False}
- )
- @testing.requires.table_reflection
- @testing.requires.schemas
- def test_get_columns_with_schema(self):
- self._test_get_columns(schema=testing.config.test_schema)
- @testing.requires.temp_table_reflection
- def test_get_temp_table_columns(self):
- meta = MetaData(self.bind)
- user_tmp = self.tables.user_tmp
- insp = inspect(meta.bind)
- cols = insp.get_columns('user_tmp')
- self.assert_(len(cols) > 0, len(cols))
- for i, col in enumerate(user_tmp.columns):
- eq_(col.name, cols[i]['name'])
- @testing.requires.temp_table_reflection
- @testing.requires.view_column_reflection
- @testing.requires.temporary_views
- def test_get_temp_view_columns(self):
- insp = inspect(self.bind)
- cols = insp.get_columns('user_tmp_v')
- eq_(
- [col['name'] for col in cols],
- ['id', 'name', 'foo']
- )
- @testing.requires.view_column_reflection
- def test_get_view_columns(self):
- self._test_get_columns(table_type='view')
- @testing.requires.view_column_reflection
- @testing.requires.schemas
- def test_get_view_columns_with_schema(self):
- self._test_get_columns(
- schema=testing.config.test_schema, table_type='view')
- @testing.provide_metadata
- def _test_get_pk_constraint(self, schema=None):
- meta = self.metadata
- users, addresses = self.tables.users, self.tables.email_addresses
- insp = inspect(meta.bind)
- users_cons = insp.get_pk_constraint(users.name, schema=schema)
- users_pkeys = users_cons['constrained_columns']
- eq_(users_pkeys, ['user_id'])
- addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
- addr_pkeys = addr_cons['constrained_columns']
- eq_(addr_pkeys, ['address_id'])
- with testing.requires.reflects_pk_names.fail_if():
- eq_(addr_cons['name'], 'email_ad_pk')
- @testing.requires.primary_key_constraint_reflection
- def test_get_pk_constraint(self):
- self._test_get_pk_constraint()
- @testing.requires.table_reflection
- @testing.requires.primary_key_constraint_reflection
- @testing.requires.schemas
- def test_get_pk_constraint_with_schema(self):
- self._test_get_pk_constraint(schema=testing.config.test_schema)
- @testing.requires.table_reflection
- @testing.provide_metadata
- def test_deprecated_get_primary_keys(self):
- meta = self.metadata
- users = self.tables.users
- insp = Inspector(meta.bind)
- assert_raises_message(
- sa_exc.SADeprecationWarning,
- "Call to deprecated method get_primary_keys."
- " Use get_pk_constraint instead.",
- insp.get_primary_keys, users.name
- )
- @testing.provide_metadata
- def _test_get_foreign_keys(self, schema=None):
- meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- insp = inspect(meta.bind)
- expected_schema = schema
- # users
- if testing.requires.self_referential_foreign_keys.enabled:
- users_fkeys = insp.get_foreign_keys(users.name,
- schema=schema)
- fkey1 = users_fkeys[0]
- with testing.requires.named_constraints.fail_if():
- self.assert_(fkey1['name'] is not None)
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
- if testing.requires.self_referential_foreign_keys.enabled:
- eq_(fkey1['constrained_columns'], ['parent_user_id'])
- # addresses
- addr_fkeys = insp.get_foreign_keys(addresses.name,
- schema=schema)
- fkey1 = addr_fkeys[0]
- with testing.requires.named_constraints.fail_if():
- self.assert_(fkey1['name'] is not None)
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
- eq_(fkey1['constrained_columns'], ['remote_user_id'])
- @testing.requires.foreign_key_constraint_reflection
- def test_get_foreign_keys(self):
- self._test_get_foreign_keys()
- @testing.requires.foreign_key_constraint_reflection
- @testing.requires.schemas
- def test_get_foreign_keys_with_schema(self):
- self._test_get_foreign_keys(schema=testing.config.test_schema)
- @testing.requires.foreign_key_constraint_option_reflection
- @testing.provide_metadata
- def test_get_foreign_key_options(self):
- meta = self.metadata
- Table(
- 'x', meta,
- Column('id', Integer, primary_key=True),
- test_needs_fk=True
- )
- Table('table', meta,
- Column('id', Integer, primary_key=True),
- Column('x_id', Integer, sa.ForeignKey('x.id', name='xid')),
- Column('test', String(10)),
- test_needs_fk=True)
- Table('user', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(50), nullable=False),
- Column('tid', Integer),
- sa.ForeignKeyConstraint(
- ['tid'], ['table.id'],
- name='myfk',
- onupdate="SET NULL", ondelete="CASCADE"),
- test_needs_fk=True)
- meta.create_all()
- insp = inspect(meta.bind)
- # test 'options' is always present for a backend
- # that can reflect these, since alembic looks for this
- opts = insp.get_foreign_keys('table')[0]['options']
- eq_(
- dict(
- (k, opts[k])
- for k in opts if opts[k]
- ),
- {}
- )
- opts = insp.get_foreign_keys('user')[0]['options']
- eq_(
- dict(
- (k, opts[k])
- for k in opts if opts[k]
- ),
- {'onupdate': 'SET NULL', 'ondelete': 'CASCADE'}
- )
- @testing.provide_metadata
- def _test_get_indexes(self, schema=None):
- meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- # The database may decide to create indexes for foreign keys, etc.
- # so there may be more indexes than expected.
- insp = inspect(meta.bind)
- indexes = insp.get_indexes('users', schema=schema)
- expected_indexes = [
- {'unique': False,
- 'column_names': ['test1', 'test2'],
- 'name': 'users_t_idx'},
- {'unique': False,
- 'column_names': ['user_id', 'test2', 'test1'],
- 'name': 'users_all_idx'}
- ]
- index_names = [d['name'] for d in indexes]
- for e_index in expected_indexes:
- assert e_index['name'] in index_names
- index = indexes[index_names.index(e_index['name'])]
- for key in e_index:
- eq_(e_index[key], index[key])
- @testing.requires.index_reflection
- def test_get_indexes(self):
- self._test_get_indexes()
- @testing.requires.index_reflection
- @testing.requires.schemas
- def test_get_indexes_with_schema(self):
- self._test_get_indexes(schema=testing.config.test_schema)
- @testing.requires.unique_constraint_reflection
- def test_get_unique_constraints(self):
- self._test_get_unique_constraints()
- @testing.requires.temp_table_reflection
- @testing.requires.unique_constraint_reflection
- def test_get_temp_table_unique_constraints(self):
- insp = inspect(self.bind)
- reflected = insp.get_unique_constraints('user_tmp')
- for refl in reflected:
- # Different dialects handle duplicate index and constraints
- # differently, so ignore this flag
- refl.pop('duplicates_index', None)
- eq_(reflected, [{'column_names': ['name'], 'name': 'user_tmp_uq'}])
- @testing.requires.temp_table_reflection
- def test_get_temp_table_indexes(self):
- insp = inspect(self.bind)
- indexes = insp.get_indexes('user_tmp')
- for ind in indexes:
- ind.pop('dialect_options', None)
- eq_(
- # TODO: we need to add better filtering for indexes/uq constraints
- # that are doubled up
- [idx for idx in indexes if idx['name'] == 'user_tmp_ix'],
- [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}]
- )
- @testing.requires.unique_constraint_reflection
- @testing.requires.schemas
- def test_get_unique_constraints_with_schema(self):
- self._test_get_unique_constraints(schema=testing.config.test_schema)
- @testing.provide_metadata
- def _test_get_unique_constraints(self, schema=None):
- # SQLite dialect needs to parse the names of the constraints
- # separately from what it gets from PRAGMA index_list(), and
- # then matches them up. so same set of column_names in two
- # constraints will confuse it. Perhaps we should no longer
- # bother with index_list() here since we have the whole
- # CREATE TABLE?
- uniques = sorted(
- [
- {'name': 'unique_a', 'column_names': ['a']},
- {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
- {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
- {'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
- {'name': 'i.have.dots', 'column_names': ['b']},
- {'name': 'i have spaces', 'column_names': ['c']},
- ],
- key=operator.itemgetter('name')
- )
- orig_meta = self.metadata
- table = Table(
- 'testtbl', orig_meta,
- Column('a', sa.String(20)),
- Column('b', sa.String(30)),
- Column('c', sa.Integer),
- # reserved identifiers
- Column('asc', sa.String(30)),
- Column('key', sa.String(30)),
- schema=schema
- )
- for uc in uniques:
- table.append_constraint(
- sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
- )
- orig_meta.create_all()
- inspector = inspect(orig_meta.bind)
- reflected = sorted(
- inspector.get_unique_constraints('testtbl', schema=schema),
- key=operator.itemgetter('name')
- )
- for orig, refl in zip(uniques, reflected):
- # Different dialects handle duplicate index and constraints
- # differently, so ignore this flag
- refl.pop('duplicates_index', None)
- eq_(orig, refl)
- @testing.provide_metadata
- def _test_get_view_definition(self, schema=None):
- meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- view_name1 = 'users_v'
- view_name2 = 'email_addresses_v'
- insp = inspect(meta.bind)
- v1 = insp.get_view_definition(view_name1, schema=schema)
- self.assert_(v1)
- v2 = insp.get_view_definition(view_name2, schema=schema)
- self.assert_(v2)
- @testing.requires.view_reflection
- def test_get_view_definition(self):
- self._test_get_view_definition()
- @testing.requires.view_reflection
- @testing.requires.schemas
- def test_get_view_definition_with_schema(self):
- self._test_get_view_definition(schema=testing.config.test_schema)
- @testing.only_on("postgresql", "PG specific feature")
- @testing.provide_metadata
- def _test_get_table_oid(self, table_name, schema=None):
- meta = self.metadata
- users, addresses, dingalings = self.tables.users, \
- self.tables.email_addresses, self.tables.dingalings
- insp = inspect(meta.bind)
- oid = insp.get_table_oid(table_name, schema)
- self.assert_(isinstance(oid, int))
- def test_get_table_oid(self):
- self._test_get_table_oid('users')
- @testing.requires.schemas
- def test_get_table_oid_with_schema(self):
- self._test_get_table_oid('users', schema=testing.config.test_schema)
- @testing.requires.table_reflection
- @testing.provide_metadata
- def test_autoincrement_col(self):
- """test that 'autoincrement' is reflected according to sqla's policy.
- Don't mark this test as unsupported for any backend !
- (technically it fails with MySQL InnoDB since "id" comes before "id2")
- A backend is better off not returning "autoincrement" at all,
- instead of potentially returning "False" for an auto-incrementing
- primary key column.
- """
- meta = self.metadata
- insp = inspect(meta.bind)
- for tname, cname in [
- ('users', 'user_id'),
- ('email_addresses', 'address_id'),
- ('dingalings', 'dingaling_id'),
- ]:
- cols = insp.get_columns(tname)
- id_ = dict((c['name'], c) for c in cols)[cname]
- assert id_.get('autoincrement', True)
- class NormalizedNameTest(fixtures.TablesTest):
- __requires__ = 'denormalized_names',
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- quoted_name('t1', quote=True), metadata,
- Column('id', Integer, primary_key=True),
- )
- Table(
- quoted_name('t2', quote=True), metadata,
- Column('id', Integer, primary_key=True),
- Column('t1id', ForeignKey('t1.id'))
- )
- def test_reflect_lowercase_forced_tables(self):
- m2 = MetaData(testing.db)
- t2_ref = Table(quoted_name('t2', quote=True), m2, autoload=True)
- t1_ref = m2.tables['t1']
- assert t2_ref.c.t1id.references(t1_ref.c.id)
- m3 = MetaData(testing.db)
- m3.reflect(only=lambda name, m: name.lower() in ('t1', 't2'))
- assert m3.tables['t2'].c.t1id.references(m3.tables['t1'].c.id)
- def test_get_table_names(self):
- tablenames = [
- t for t in inspect(testing.db).get_table_names()
- if t.lower() in ("t1", "t2")]
- eq_(tablenames[0].upper(), tablenames[0].lower())
- eq_(tablenames[1].upper(), tablenames[1].lower())
- __all__ = ('ComponentReflectionTest', 'HasTableTest', 'NormalizedNameTest')
|