schema.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # testing/schema.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. from . import exclusions
  8. from .. import schema, event
  9. from . import config
  10. __all__ = 'Table', 'Column',
  11. table_options = {}
  12. def Table(*args, **kw):
  13. """A schema.Table wrapper/hook for dialect-specific tweaks."""
  14. test_opts = dict([(k, kw.pop(k)) for k in list(kw)
  15. if k.startswith('test_')])
  16. kw.update(table_options)
  17. if exclusions.against(config._current, 'mysql'):
  18. if 'mysql_engine' not in kw and 'mysql_type' not in kw:
  19. if 'test_needs_fk' in test_opts or 'test_needs_acid' in test_opts:
  20. kw['mysql_engine'] = 'InnoDB'
  21. else:
  22. kw['mysql_engine'] = 'MyISAM'
  23. # Apply some default cascading rules for self-referential foreign keys.
  24. # MySQL InnoDB has some issues around seleting self-refs too.
  25. if exclusions.against(config._current, 'firebird'):
  26. table_name = args[0]
  27. unpack = (config.db.dialect.
  28. identifier_preparer.unformat_identifiers)
  29. # Only going after ForeignKeys in Columns. May need to
  30. # expand to ForeignKeyConstraint too.
  31. fks = [fk
  32. for col in args if isinstance(col, schema.Column)
  33. for fk in col.foreign_keys]
  34. for fk in fks:
  35. # root around in raw spec
  36. ref = fk._colspec
  37. if isinstance(ref, schema.Column):
  38. name = ref.table.name
  39. else:
  40. # take just the table name: on FB there cannot be
  41. # a schema, so the first element is always the
  42. # table name, possibly followed by the field name
  43. name = unpack(ref)[0]
  44. if name == table_name:
  45. if fk.ondelete is None:
  46. fk.ondelete = 'CASCADE'
  47. if fk.onupdate is None:
  48. fk.onupdate = 'CASCADE'
  49. return schema.Table(*args, **kw)
  50. def Column(*args, **kw):
  51. """A schema.Column wrapper/hook for dialect-specific tweaks."""
  52. test_opts = dict([(k, kw.pop(k)) for k in list(kw)
  53. if k.startswith('test_')])
  54. if not config.requirements.foreign_key_ddl.enabled_for_config(config):
  55. args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
  56. col = schema.Column(*args, **kw)
  57. if test_opts.get('test_needs_autoincrement', False) and \
  58. kw.get('primary_key', False):
  59. if col.default is None and col.server_default is None:
  60. col.autoincrement = True
  61. # allow any test suite to pick up on this
  62. col.info['test_needs_autoincrement'] = True
  63. # hardcoded rule for firebird, oracle; this should
  64. # be moved out
  65. if exclusions.against(config._current, 'firebird', 'oracle'):
  66. def add_seq(c, tbl):
  67. c._init_items(
  68. schema.Sequence(_truncate_name(
  69. config.db.dialect, tbl.name + '_' + c.name + '_seq'),
  70. optional=True)
  71. )
  72. event.listen(col, 'after_parent_attach', add_seq, propagate=True)
  73. return col
  74. def _truncate_name(dialect, name):
  75. if len(name) > dialect.max_identifier_length:
  76. return name[0:max(dialect.max_identifier_length - 6, 0)] + \
  77. "_" + hex(hash(name) % 64)[2:]
  78. else:
  79. return name