test_reflection.py 28 KB


  1. import sqlalchemy as sa
  2. from sqlalchemy import exc as sa_exc
  3. from sqlalchemy import types as sql_types
  4. from sqlalchemy import inspect
  5. from sqlalchemy import MetaData, Integer, String
  6. from sqlalchemy.engine.reflection import Inspector
  7. from sqlalchemy.testing import engines, fixtures
  8. from sqlalchemy.testing.schema import Table, Column
  9. from sqlalchemy.testing import eq_, assert_raises_message
  10. from sqlalchemy import testing
  11. from .. import config
  12. import operator
  13. from sqlalchemy.schema import DDL, Index
  14. from sqlalchemy import event
  15. from sqlalchemy.sql.elements import quoted_name
  16. from sqlalchemy import ForeignKey
  17. metadata, users = None, None
  18. class HasTableTest(fixtures.TablesTest):
  19. __backend__ = True
  20. @classmethod
  21. def define_tables(cls, metadata):
  22. Table('test_table', metadata,
  23. Column('id', Integer, primary_key=True),
  24. Column('data', String(50))
  25. )
  26. def test_has_table(self):
  27. with config.db.begin() as conn:
  28. assert config.db.dialect.has_table(conn, "test_table")
  29. assert not config.db.dialect.has_table(conn, "nonexistent_table")
  30. class ComponentReflectionTest(fixtures.TablesTest):
  31. run_inserts = run_deletes = None
  32. __backend__ = True
  33. @classmethod
  34. def setup_bind(cls):
  35. if config.requirements.independent_connections.enabled:
  36. from sqlalchemy import pool
  37. return engines.testing_engine(
  38. options=dict(poolclass=pool.StaticPool))
  39. else:
  40. return config.db
  41. @classmethod
  42. def define_tables(cls, metadata):
  43. cls.define_reflected_tables(metadata, None)
  44. if testing.requires.schemas.enabled:
  45. cls.define_reflected_tables(metadata, testing.config.test_schema)
  46. @classmethod
  47. def define_reflected_tables(cls, metadata, schema):
  48. if schema:
  49. schema_prefix = schema + "."
  50. else:
  51. schema_prefix = ""
  52. if testing.requires.self_referential_foreign_keys.enabled:
  53. users = Table('users', metadata,
  54. Column('user_id', sa.INT, primary_key=True),
  55. Column('test1', sa.CHAR(5), nullable=False),
  56. Column('test2', sa.Float(5), nullable=False),
  57. Column('parent_user_id', sa.Integer,
  58. sa.ForeignKey('%susers.user_id' %
  59. schema_prefix)),
  60. schema=schema,
  61. test_needs_fk=True,
  62. )
  63. else:
  64. users = Table('users', metadata,
  65. Column('user_id', sa.INT, primary_key=True),
  66. Column('test1', sa.CHAR(5), nullable=False),
  67. Column('test2', sa.Float(5), nullable=False),
  68. schema=schema,
  69. test_needs_fk=True,
  70. )
  71. Table("dingalings", metadata,
  72. Column('dingaling_id', sa.Integer, primary_key=True),
  73. Column('address_id', sa.Integer,
  74. sa.ForeignKey('%semail_addresses.address_id' %
  75. schema_prefix)),
  76. Column('data', sa.String(30)),
  77. schema=schema,
  78. test_needs_fk=True,
  79. )
  80. Table('email_addresses', metadata,
  81. Column('address_id', sa.Integer),
  82. Column('remote_user_id', sa.Integer,
  83. sa.ForeignKey(users.c.user_id)),
  84. Column('email_address', sa.String(20)),
  85. sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
  86. schema=schema,
  87. test_needs_fk=True,
  88. )
  89. if testing.requires.index_reflection.enabled:
  90. cls.define_index(metadata, users)
  91. if testing.requires.view_column_reflection.enabled:
  92. cls.define_views(metadata, schema)
  93. if not schema and testing.requires.temp_table_reflection.enabled:
  94. cls.define_temp_tables(metadata)
  95. @classmethod
  96. def define_temp_tables(cls, metadata):
  97. # cheat a bit, we should fix this with some dialect-level
  98. # temp table fixture
  99. if testing.against("oracle"):
  100. kw = {
  101. 'prefixes': ["GLOBAL TEMPORARY"],
  102. 'oracle_on_commit': 'PRESERVE ROWS'
  103. }
  104. else:
  105. kw = {
  106. 'prefixes': ["TEMPORARY"],
  107. }
  108. user_tmp = Table(
  109. "user_tmp", metadata,
  110. Column("id", sa.INT, primary_key=True),
  111. Column('name', sa.VARCHAR(50)),
  112. Column('foo', sa.INT),
  113. sa.UniqueConstraint('name', name='user_tmp_uq'),
  114. sa.Index("user_tmp_ix", "foo"),
  115. **kw
  116. )
  117. if testing.requires.view_reflection.enabled and \
  118. testing.requires.temporary_views.enabled:
  119. event.listen(
  120. user_tmp, "after_create",
  121. DDL("create temporary view user_tmp_v as "
  122. "select * from user_tmp")
  123. )
  124. event.listen(
  125. user_tmp, "before_drop",
  126. DDL("drop view user_tmp_v")
  127. )
  128. @classmethod
  129. def define_index(cls, metadata, users):
  130. Index("users_t_idx", users.c.test1, users.c.test2)
  131. Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
  132. @classmethod
  133. def define_views(cls, metadata, schema):
  134. for table_name in ('users', 'email_addresses'):
  135. fullname = table_name
  136. if schema:
  137. fullname = "%s.%s" % (schema, table_name)
  138. view_name = fullname + '_v'
  139. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  140. view_name, fullname)
  141. event.listen(
  142. metadata,
  143. "after_create",
  144. DDL(query)
  145. )
  146. event.listen(
  147. metadata,
  148. "before_drop",
  149. DDL("DROP VIEW %s" % view_name)
  150. )
  151. @testing.requires.schema_reflection
  152. def test_get_schema_names(self):
  153. insp = inspect(testing.db)
  154. self.assert_(testing.config.test_schema in insp.get_schema_names())
  155. @testing.requires.schema_reflection
  156. def test_dialect_initialize(self):
  157. engine = engines.testing_engine()
  158. assert not hasattr(engine.dialect, 'default_schema_name')
  159. inspect(engine)
  160. assert hasattr(engine.dialect, 'default_schema_name')
  161. @testing.requires.schema_reflection
  162. def test_get_default_schema_name(self):
  163. insp = inspect(testing.db)
  164. eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
  165. @testing.provide_metadata
  166. def _test_get_table_names(self, schema=None, table_type='table',
  167. order_by=None):
  168. meta = self.metadata
  169. users, addresses, dingalings = self.tables.users, \
  170. self.tables.email_addresses, self.tables.dingalings
  171. insp = inspect(meta.bind)
  172. if table_type == 'view':
  173. table_names = insp.get_view_names(schema)
  174. table_names.sort()
  175. answer = ['email_addresses_v', 'users_v']
  176. eq_(sorted(table_names), answer)
  177. else:
  178. table_names = insp.get_table_names(schema,
  179. order_by=order_by)
  180. if order_by == 'foreign_key':
  181. answer = ['users', 'email_addresses', 'dingalings']
  182. eq_(table_names, answer)
  183. else:
  184. answer = ['dingalings', 'email_addresses', 'users']
  185. eq_(sorted(table_names), answer)
  186. @testing.requires.temp_table_names
  187. def test_get_temp_table_names(self):
  188. insp = inspect(self.bind)
  189. temp_table_names = insp.get_temp_table_names()
  190. eq_(sorted(temp_table_names), ['user_tmp'])
  191. @testing.requires.view_reflection
  192. @testing.requires.temp_table_names
  193. @testing.requires.temporary_views
  194. def test_get_temp_view_names(self):
  195. insp = inspect(self.bind)
  196. temp_table_names = insp.get_temp_view_names()
  197. eq_(sorted(temp_table_names), ['user_tmp_v'])
  198. @testing.requires.table_reflection
  199. def test_get_table_names(self):
  200. self._test_get_table_names()
  201. @testing.requires.table_reflection
  202. @testing.requires.foreign_key_constraint_reflection
  203. def test_get_table_names_fks(self):
  204. self._test_get_table_names(order_by='foreign_key')
  205. @testing.requires.table_reflection
  206. @testing.requires.schemas
  207. def test_get_table_names_with_schema(self):
  208. self._test_get_table_names(testing.config.test_schema)
  209. @testing.requires.view_column_reflection
  210. def test_get_view_names(self):
  211. self._test_get_table_names(table_type='view')
  212. @testing.requires.view_column_reflection
  213. @testing.requires.schemas
  214. def test_get_view_names_with_schema(self):
  215. self._test_get_table_names(
  216. testing.config.test_schema, table_type='view')
  217. @testing.requires.table_reflection
  218. @testing.requires.view_column_reflection
  219. def test_get_tables_and_views(self):
  220. self._test_get_table_names()
  221. self._test_get_table_names(table_type='view')
  222. def _test_get_columns(self, schema=None, table_type='table'):
  223. meta = MetaData(testing.db)
  224. users, addresses, dingalings = self.tables.users, \
  225. self.tables.email_addresses, self.tables.dingalings
  226. table_names = ['users', 'email_addresses']
  227. if table_type == 'view':
  228. table_names = ['users_v', 'email_addresses_v']
  229. insp = inspect(meta.bind)
  230. for table_name, table in zip(table_names, (users,
  231. addresses)):
  232. schema_name = schema
  233. cols = insp.get_columns(table_name, schema=schema_name)
  234. self.assert_(len(cols) > 0, len(cols))
  235. # should be in order
  236. for i, col in enumerate(table.columns):
  237. eq_(col.name, cols[i]['name'])
  238. ctype = cols[i]['type'].__class__
  239. ctype_def = col.type
  240. if isinstance(ctype_def, sa.types.TypeEngine):
  241. ctype_def = ctype_def.__class__
  242. # Oracle returns Date for DateTime.
  243. if testing.against('oracle') and ctype_def \
  244. in (sql_types.Date, sql_types.DateTime):
  245. ctype_def = sql_types.Date
  246. # assert that the desired type and return type share
  247. # a base within one of the generic types.
  248. self.assert_(len(set(ctype.__mro__).
  249. intersection(ctype_def.__mro__).
  250. intersection([
  251. sql_types.Integer,
  252. sql_types.Numeric,
  253. sql_types.DateTime,
  254. sql_types.Date,
  255. sql_types.Time,
  256. sql_types.String,
  257. sql_types._Binary,
  258. ])) > 0, '%s(%s), %s(%s)' %
  259. (col.name, col.type, cols[i]['name'], ctype))
  260. if not col.primary_key:
  261. assert cols[i]['default'] is None
  262. @testing.requires.table_reflection
  263. def test_get_columns(self):
  264. self._test_get_columns()
  265. @testing.provide_metadata
  266. def _type_round_trip(self, *types):
  267. t = Table('t', self.metadata,
  268. *[
  269. Column('t%d' % i, type_)
  270. for i, type_ in enumerate(types)
  271. ]
  272. )
  273. t.create()
  274. return [
  275. c['type'] for c in
  276. inspect(self.metadata.bind).get_columns('t')
  277. ]
  278. @testing.requires.table_reflection
  279. def test_numeric_reflection(self):
  280. for typ in self._type_round_trip(
  281. sql_types.Numeric(18, 5),
  282. ):
  283. assert isinstance(typ, sql_types.Numeric)
  284. eq_(typ.precision, 18)
  285. eq_(typ.scale, 5)
  286. @testing.requires.table_reflection
  287. def test_varchar_reflection(self):
  288. typ = self._type_round_trip(sql_types.String(52))[0]
  289. assert isinstance(typ, sql_types.String)
  290. eq_(typ.length, 52)
  291. @testing.requires.table_reflection
  292. @testing.provide_metadata
  293. def test_nullable_reflection(self):
  294. t = Table('t', self.metadata,
  295. Column('a', Integer, nullable=True),
  296. Column('b', Integer, nullable=False))
  297. t.create()
  298. eq_(
  299. dict(
  300. (col['name'], col['nullable'])
  301. for col in inspect(self.metadata.bind).get_columns('t')
  302. ),
  303. {"a": True, "b": False}
  304. )
  305. @testing.requires.table_reflection
  306. @testing.requires.schemas
  307. def test_get_columns_with_schema(self):
  308. self._test_get_columns(schema=testing.config.test_schema)
  309. @testing.requires.temp_table_reflection
  310. def test_get_temp_table_columns(self):
  311. meta = MetaData(self.bind)
  312. user_tmp = self.tables.user_tmp
  313. insp = inspect(meta.bind)
  314. cols = insp.get_columns('user_tmp')
  315. self.assert_(len(cols) > 0, len(cols))
  316. for i, col in enumerate(user_tmp.columns):
  317. eq_(col.name, cols[i]['name'])
  318. @testing.requires.temp_table_reflection
  319. @testing.requires.view_column_reflection
  320. @testing.requires.temporary_views
  321. def test_get_temp_view_columns(self):
  322. insp = inspect(self.bind)
  323. cols = insp.get_columns('user_tmp_v')
  324. eq_(
  325. [col['name'] for col in cols],
  326. ['id', 'name', 'foo']
  327. )
  328. @testing.requires.view_column_reflection
  329. def test_get_view_columns(self):
  330. self._test_get_columns(table_type='view')
  331. @testing.requires.view_column_reflection
  332. @testing.requires.schemas
  333. def test_get_view_columns_with_schema(self):
  334. self._test_get_columns(
  335. schema=testing.config.test_schema, table_type='view')
  336. @testing.provide_metadata
  337. def _test_get_pk_constraint(self, schema=None):
  338. meta = self.metadata
  339. users, addresses = self.tables.users, self.tables.email_addresses
  340. insp = inspect(meta.bind)
  341. users_cons = insp.get_pk_constraint(users.name, schema=schema)
  342. users_pkeys = users_cons['constrained_columns']
  343. eq_(users_pkeys, ['user_id'])
  344. addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
  345. addr_pkeys = addr_cons['constrained_columns']
  346. eq_(addr_pkeys, ['address_id'])
  347. with testing.requires.reflects_pk_names.fail_if():
  348. eq_(addr_cons['name'], 'email_ad_pk')
  349. @testing.requires.primary_key_constraint_reflection
  350. def test_get_pk_constraint(self):
  351. self._test_get_pk_constraint()
  352. @testing.requires.table_reflection
  353. @testing.requires.primary_key_constraint_reflection
  354. @testing.requires.schemas
  355. def test_get_pk_constraint_with_schema(self):
  356. self._test_get_pk_constraint(schema=testing.config.test_schema)
  357. @testing.requires.table_reflection
  358. @testing.provide_metadata
  359. def test_deprecated_get_primary_keys(self):
  360. meta = self.metadata
  361. users = self.tables.users
  362. insp = Inspector(meta.bind)
  363. assert_raises_message(
  364. sa_exc.SADeprecationWarning,
  365. "Call to deprecated method get_primary_keys."
  366. " Use get_pk_constraint instead.",
  367. insp.get_primary_keys, users.name
  368. )
  369. @testing.provide_metadata
  370. def _test_get_foreign_keys(self, schema=None):
  371. meta = self.metadata
  372. users, addresses, dingalings = self.tables.users, \
  373. self.tables.email_addresses, self.tables.dingalings
  374. insp = inspect(meta.bind)
  375. expected_schema = schema
  376. # users
  377. if testing.requires.self_referential_foreign_keys.enabled:
  378. users_fkeys = insp.get_foreign_keys(users.name,
  379. schema=schema)
  380. fkey1 = users_fkeys[0]
  381. with testing.requires.named_constraints.fail_if():
  382. self.assert_(fkey1['name'] is not None)
  383. eq_(fkey1['referred_schema'], expected_schema)
  384. eq_(fkey1['referred_table'], users.name)
  385. eq_(fkey1['referred_columns'], ['user_id', ])
  386. if testing.requires.self_referential_foreign_keys.enabled:
  387. eq_(fkey1['constrained_columns'], ['parent_user_id'])
  388. # addresses
  389. addr_fkeys = insp.get_foreign_keys(addresses.name,
  390. schema=schema)
  391. fkey1 = addr_fkeys[0]
  392. with testing.requires.named_constraints.fail_if():
  393. self.assert_(fkey1['name'] is not None)
  394. eq_(fkey1['referred_schema'], expected_schema)
  395. eq_(fkey1['referred_table'], users.name)
  396. eq_(fkey1['referred_columns'], ['user_id', ])
  397. eq_(fkey1['constrained_columns'], ['remote_user_id'])
  398. @testing.requires.foreign_key_constraint_reflection
  399. def test_get_foreign_keys(self):
  400. self._test_get_foreign_keys()
  401. @testing.requires.foreign_key_constraint_reflection
  402. @testing.requires.schemas
  403. def test_get_foreign_keys_with_schema(self):
  404. self._test_get_foreign_keys(schema=testing.config.test_schema)
  405. @testing.requires.foreign_key_constraint_option_reflection
  406. @testing.provide_metadata
  407. def test_get_foreign_key_options(self):
  408. meta = self.metadata
  409. Table(
  410. 'x', meta,
  411. Column('id', Integer, primary_key=True),
  412. test_needs_fk=True
  413. )
  414. Table('table', meta,
  415. Column('id', Integer, primary_key=True),
  416. Column('x_id', Integer, sa.ForeignKey('x.id', name='xid')),
  417. Column('test', String(10)),
  418. test_needs_fk=True)
  419. Table('user', meta,
  420. Column('id', Integer, primary_key=True),
  421. Column('name', String(50), nullable=False),
  422. Column('tid', Integer),
  423. sa.ForeignKeyConstraint(
  424. ['tid'], ['table.id'],
  425. name='myfk',
  426. onupdate="SET NULL", ondelete="CASCADE"),
  427. test_needs_fk=True)
  428. meta.create_all()
  429. insp = inspect(meta.bind)
  430. # test 'options' is always present for a backend
  431. # that can reflect these, since alembic looks for this
  432. opts = insp.get_foreign_keys('table')[0]['options']
  433. eq_(
  434. dict(
  435. (k, opts[k])
  436. for k in opts if opts[k]
  437. ),
  438. {}
  439. )
  440. opts = insp.get_foreign_keys('user')[0]['options']
  441. eq_(
  442. dict(
  443. (k, opts[k])
  444. for k in opts if opts[k]
  445. ),
  446. {'onupdate': 'SET NULL', 'ondelete': 'CASCADE'}
  447. )
  448. @testing.provide_metadata
  449. def _test_get_indexes(self, schema=None):
  450. meta = self.metadata
  451. users, addresses, dingalings = self.tables.users, \
  452. self.tables.email_addresses, self.tables.dingalings
  453. # The database may decide to create indexes for foreign keys, etc.
  454. # so there may be more indexes than expected.
  455. insp = inspect(meta.bind)
  456. indexes = insp.get_indexes('users', schema=schema)
  457. expected_indexes = [
  458. {'unique': False,
  459. 'column_names': ['test1', 'test2'],
  460. 'name': 'users_t_idx'},
  461. {'unique': False,
  462. 'column_names': ['user_id', 'test2', 'test1'],
  463. 'name': 'users_all_idx'}
  464. ]
  465. index_names = [d['name'] for d in indexes]
  466. for e_index in expected_indexes:
  467. assert e_index['name'] in index_names
  468. index = indexes[index_names.index(e_index['name'])]
  469. for key in e_index:
  470. eq_(e_index[key], index[key])
  471. @testing.requires.index_reflection
  472. def test_get_indexes(self):
  473. self._test_get_indexes()
  474. @testing.requires.index_reflection
  475. @testing.requires.schemas
  476. def test_get_indexes_with_schema(self):
  477. self._test_get_indexes(schema=testing.config.test_schema)
  478. @testing.requires.unique_constraint_reflection
  479. def test_get_unique_constraints(self):
  480. self._test_get_unique_constraints()
  481. @testing.requires.temp_table_reflection
  482. @testing.requires.unique_constraint_reflection
  483. def test_get_temp_table_unique_constraints(self):
  484. insp = inspect(self.bind)
  485. reflected = insp.get_unique_constraints('user_tmp')
  486. for refl in reflected:
  487. # Different dialects handle duplicate index and constraints
  488. # differently, so ignore this flag
  489. refl.pop('duplicates_index', None)
  490. eq_(reflected, [{'column_names': ['name'], 'name': 'user_tmp_uq'}])
  491. @testing.requires.temp_table_reflection
  492. def test_get_temp_table_indexes(self):
  493. insp = inspect(self.bind)
  494. indexes = insp.get_indexes('user_tmp')
  495. for ind in indexes:
  496. ind.pop('dialect_options', None)
  497. eq_(
  498. # TODO: we need to add better filtering for indexes/uq constraints
  499. # that are doubled up
  500. [idx for idx in indexes if idx['name'] == 'user_tmp_ix'],
  501. [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}]
  502. )
  503. @testing.requires.unique_constraint_reflection
  504. @testing.requires.schemas
  505. def test_get_unique_constraints_with_schema(self):
  506. self._test_get_unique_constraints(schema=testing.config.test_schema)
  507. @testing.provide_metadata
  508. def _test_get_unique_constraints(self, schema=None):
  509. # SQLite dialect needs to parse the names of the constraints
  510. # separately from what it gets from PRAGMA index_list(), and
  511. # then matches them up. so same set of column_names in two
  512. # constraints will confuse it. Perhaps we should no longer
  513. # bother with index_list() here since we have the whole
  514. # CREATE TABLE?
  515. uniques = sorted(
  516. [
  517. {'name': 'unique_a', 'column_names': ['a']},
  518. {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
  519. {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
  520. {'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
  521. {'name': 'i.have.dots', 'column_names': ['b']},
  522. {'name': 'i have spaces', 'column_names': ['c']},
  523. ],
  524. key=operator.itemgetter('name')
  525. )
  526. orig_meta = self.metadata
  527. table = Table(
  528. 'testtbl', orig_meta,
  529. Column('a', sa.String(20)),
  530. Column('b', sa.String(30)),
  531. Column('c', sa.Integer),
  532. # reserved identifiers
  533. Column('asc', sa.String(30)),
  534. Column('key', sa.String(30)),
  535. schema=schema
  536. )
  537. for uc in uniques:
  538. table.append_constraint(
  539. sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
  540. )
  541. orig_meta.create_all()
  542. inspector = inspect(orig_meta.bind)
  543. reflected = sorted(
  544. inspector.get_unique_constraints('testtbl', schema=schema),
  545. key=operator.itemgetter('name')
  546. )
  547. for orig, refl in zip(uniques, reflected):
  548. # Different dialects handle duplicate index and constraints
  549. # differently, so ignore this flag
  550. refl.pop('duplicates_index', None)
  551. eq_(orig, refl)
  552. @testing.provide_metadata
  553. def _test_get_view_definition(self, schema=None):
  554. meta = self.metadata
  555. users, addresses, dingalings = self.tables.users, \
  556. self.tables.email_addresses, self.tables.dingalings
  557. view_name1 = 'users_v'
  558. view_name2 = 'email_addresses_v'
  559. insp = inspect(meta.bind)
  560. v1 = insp.get_view_definition(view_name1, schema=schema)
  561. self.assert_(v1)
  562. v2 = insp.get_view_definition(view_name2, schema=schema)
  563. self.assert_(v2)
  564. @testing.requires.view_reflection
  565. def test_get_view_definition(self):
  566. self._test_get_view_definition()
  567. @testing.requires.view_reflection
  568. @testing.requires.schemas
  569. def test_get_view_definition_with_schema(self):
  570. self._test_get_view_definition(schema=testing.config.test_schema)
  571. @testing.only_on("postgresql", "PG specific feature")
  572. @testing.provide_metadata
  573. def _test_get_table_oid(self, table_name, schema=None):
  574. meta = self.metadata
  575. users, addresses, dingalings = self.tables.users, \
  576. self.tables.email_addresses, self.tables.dingalings
  577. insp = inspect(meta.bind)
  578. oid = insp.get_table_oid(table_name, schema)
  579. self.assert_(isinstance(oid, int))
  580. def test_get_table_oid(self):
  581. self._test_get_table_oid('users')
  582. @testing.requires.schemas
  583. def test_get_table_oid_with_schema(self):
  584. self._test_get_table_oid('users', schema=testing.config.test_schema)
  585. @testing.requires.table_reflection
  586. @testing.provide_metadata
  587. def test_autoincrement_col(self):
  588. """test that 'autoincrement' is reflected according to sqla's policy.
  589. Don't mark this test as unsupported for any backend !
  590. (technically it fails with MySQL InnoDB since "id" comes before "id2")
  591. A backend is better off not returning "autoincrement" at all,
  592. instead of potentially returning "False" for an auto-incrementing
  593. primary key column.
  594. """
  595. meta = self.metadata
  596. insp = inspect(meta.bind)
  597. for tname, cname in [
  598. ('users', 'user_id'),
  599. ('email_addresses', 'address_id'),
  600. ('dingalings', 'dingaling_id'),
  601. ]:
  602. cols = insp.get_columns(tname)
  603. id_ = dict((c['name'], c) for c in cols)[cname]
  604. assert id_.get('autoincrement', True)
  605. class NormalizedNameTest(fixtures.TablesTest):
  606. __requires__ = 'denormalized_names',
  607. __backend__ = True
  608. @classmethod
  609. def define_tables(cls, metadata):
  610. Table(
  611. quoted_name('t1', quote=True), metadata,
  612. Column('id', Integer, primary_key=True),
  613. )
  614. Table(
  615. quoted_name('t2', quote=True), metadata,
  616. Column('id', Integer, primary_key=True),
  617. Column('t1id', ForeignKey('t1.id'))
  618. )
  619. def test_reflect_lowercase_forced_tables(self):
  620. m2 = MetaData(testing.db)
  621. t2_ref = Table(quoted_name('t2', quote=True), m2, autoload=True)
  622. t1_ref = m2.tables['t1']
  623. assert t2_ref.c.t1id.references(t1_ref.c.id)
  624. m3 = MetaData(testing.db)
  625. m3.reflect(only=lambda name, m: name.lower() in ('t1', 't2'))
  626. assert m3.tables['t2'].c.t1id.references(m3.tables['t1'].c.id)
  627. def test_get_table_names(self):
  628. tablenames = [
  629. t for t in inspect(testing.db).get_table_names()
  630. if t.lower() in ("t1", "t2")]
  631. eq_(tablenames[0].upper(), tablenames[0].lower())
  632. eq_(tablenames[1].upper(), tablenames[1].lower())
  633. __all__ = ('ComponentReflectionTest', 'HasTableTest', 'NormalizedNameTest')