fixtures.py 11 KB


  1. # testing/fixtures.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. from . import config
  8. from . import assertions, schema
  9. from .util import adict
  10. from .. import util
  11. from .engines import drop_all_tables
  12. from .entities import BasicEntity, ComparableEntity
  13. import sys
  14. import sqlalchemy as sa
  15. from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
  16. # whether or not we use unittest changes things dramatically,
  17. # as far as how py.test collection works.
  18. class TestBase(object):
  19. # A sequence of database names to always run, regardless of the
  20. # constraints below.
  21. __whitelist__ = ()
  22. # A sequence of requirement names matching testing.requires decorators
  23. __requires__ = ()
  24. # A sequence of dialect names to exclude from the test class.
  25. __unsupported_on__ = ()
  26. # If present, test class is only runnable for the *single* specified
  27. # dialect. If you need multiple, use __unsupported_on__ and invert.
  28. __only_on__ = None
  29. # A sequence of no-arg callables. If any are True, the entire testcase is
  30. # skipped.
  31. __skip_if__ = None
  32. def assert_(self, val, msg=None):
  33. assert val, msg
  34. # apparently a handful of tests are doing this....OK
  35. def setup(self):
  36. if hasattr(self, "setUp"):
  37. self.setUp()
  38. def teardown(self):
  39. if hasattr(self, "tearDown"):
  40. self.tearDown()
  41. class TablesTest(TestBase):
  42. # 'once', None
  43. run_setup_bind = 'once'
  44. # 'once', 'each', None
  45. run_define_tables = 'once'
  46. # 'once', 'each', None
  47. run_create_tables = 'once'
  48. # 'once', 'each', None
  49. run_inserts = 'each'
  50. # 'each', None
  51. run_deletes = 'each'
  52. # 'once', None
  53. run_dispose_bind = None
  54. bind = None
  55. metadata = None
  56. tables = None
  57. other = None
  58. @classmethod
  59. def setup_class(cls):
  60. cls._init_class()
  61. cls._setup_once_tables()
  62. cls._setup_once_inserts()
  63. @classmethod
  64. def _init_class(cls):
  65. if cls.run_define_tables == 'each':
  66. if cls.run_create_tables == 'once':
  67. cls.run_create_tables = 'each'
  68. assert cls.run_inserts in ('each', None)
  69. cls.other = adict()
  70. cls.tables = adict()
  71. cls.bind = cls.setup_bind()
  72. cls.metadata = sa.MetaData()
  73. cls.metadata.bind = cls.bind
  74. @classmethod
  75. def _setup_once_inserts(cls):
  76. if cls.run_inserts == 'once':
  77. cls._load_fixtures()
  78. cls.insert_data()
  79. @classmethod
  80. def _setup_once_tables(cls):
  81. if cls.run_define_tables == 'once':
  82. cls.define_tables(cls.metadata)
  83. if cls.run_create_tables == 'once':
  84. cls.metadata.create_all(cls.bind)
  85. cls.tables.update(cls.metadata.tables)
  86. def _setup_each_tables(self):
  87. if self.run_define_tables == 'each':
  88. self.tables.clear()
  89. if self.run_create_tables == 'each':
  90. drop_all_tables(self.metadata, self.bind)
  91. self.metadata.clear()
  92. self.define_tables(self.metadata)
  93. if self.run_create_tables == 'each':
  94. self.metadata.create_all(self.bind)
  95. self.tables.update(self.metadata.tables)
  96. elif self.run_create_tables == 'each':
  97. drop_all_tables(self.metadata, self.bind)
  98. self.metadata.create_all(self.bind)
  99. def _setup_each_inserts(self):
  100. if self.run_inserts == 'each':
  101. self._load_fixtures()
  102. self.insert_data()
  103. def _teardown_each_tables(self):
  104. # no need to run deletes if tables are recreated on setup
  105. if self.run_define_tables != 'each' and self.run_deletes == 'each':
  106. with self.bind.connect() as conn:
  107. for table in reversed(self.metadata.sorted_tables):
  108. try:
  109. conn.execute(table.delete())
  110. except sa.exc.DBAPIError as ex:
  111. util.print_(
  112. ("Error emptying table %s: %r" % (table, ex)),
  113. file=sys.stderr)
  114. def setup(self):
  115. self._setup_each_tables()
  116. self._setup_each_inserts()
  117. def teardown(self):
  118. self._teardown_each_tables()
  119. @classmethod
  120. def _teardown_once_metadata_bind(cls):
  121. if cls.run_create_tables:
  122. drop_all_tables(cls.metadata, cls.bind)
  123. if cls.run_dispose_bind == 'once':
  124. cls.dispose_bind(cls.bind)
  125. cls.metadata.bind = None
  126. if cls.run_setup_bind is not None:
  127. cls.bind = None
  128. @classmethod
  129. def teardown_class(cls):
  130. cls._teardown_once_metadata_bind()
  131. @classmethod
  132. def setup_bind(cls):
  133. return config.db
  134. @classmethod
  135. def dispose_bind(cls, bind):
  136. if hasattr(bind, 'dispose'):
  137. bind.dispose()
  138. elif hasattr(bind, 'close'):
  139. bind.close()
  140. @classmethod
  141. def define_tables(cls, metadata):
  142. pass
  143. @classmethod
  144. def fixtures(cls):
  145. return {}
  146. @classmethod
  147. def insert_data(cls):
  148. pass
  149. def sql_count_(self, count, fn):
  150. self.assert_sql_count(self.bind, fn, count)
  151. def sql_eq_(self, callable_, statements):
  152. self.assert_sql(self.bind, callable_, statements)
  153. @classmethod
  154. def _load_fixtures(cls):
  155. """Insert rows as represented by the fixtures() method."""
  156. headers, rows = {}, {}
  157. for table, data in cls.fixtures().items():
  158. if len(data) < 2:
  159. continue
  160. if isinstance(table, util.string_types):
  161. table = cls.tables[table]
  162. headers[table] = data[0]
  163. rows[table] = data[1:]
  164. for table in cls.metadata.sorted_tables:
  165. if table not in headers:
  166. continue
  167. cls.bind.execute(
  168. table.insert(),
  169. [dict(zip(headers[table], column_values))
  170. for column_values in rows[table]])
  171. from sqlalchemy import event
  172. class RemovesEvents(object):
  173. @util.memoized_property
  174. def _event_fns(self):
  175. return set()
  176. def event_listen(self, target, name, fn):
  177. self._event_fns.add((target, name, fn))
  178. event.listen(target, name, fn)
  179. def teardown(self):
  180. for key in self._event_fns:
  181. event.remove(*key)
  182. super_ = super(RemovesEvents, self)
  183. if hasattr(super_, "teardown"):
  184. super_.teardown()
  185. class _ORMTest(object):
  186. @classmethod
  187. def teardown_class(cls):
  188. sa.orm.session.Session.close_all()
  189. sa.orm.clear_mappers()
  190. class ORMTest(_ORMTest, TestBase):
  191. pass
  192. class MappedTest(_ORMTest, TablesTest, assertions.AssertsExecutionResults):
  193. # 'once', 'each', None
  194. run_setup_classes = 'once'
  195. # 'once', 'each', None
  196. run_setup_mappers = 'each'
  197. classes = None
  198. @classmethod
  199. def setup_class(cls):
  200. cls._init_class()
  201. if cls.classes is None:
  202. cls.classes = adict()
  203. cls._setup_once_tables()
  204. cls._setup_once_classes()
  205. cls._setup_once_mappers()
  206. cls._setup_once_inserts()
  207. @classmethod
  208. def teardown_class(cls):
  209. cls._teardown_once_class()
  210. cls._teardown_once_metadata_bind()
  211. def setup(self):
  212. self._setup_each_tables()
  213. self._setup_each_classes()
  214. self._setup_each_mappers()
  215. self._setup_each_inserts()
  216. def teardown(self):
  217. sa.orm.session.Session.close_all()
  218. self._teardown_each_mappers()
  219. self._teardown_each_classes()
  220. self._teardown_each_tables()
  221. @classmethod
  222. def _teardown_once_class(cls):
  223. cls.classes.clear()
  224. _ORMTest.teardown_class()
  225. @classmethod
  226. def _setup_once_classes(cls):
  227. if cls.run_setup_classes == 'once':
  228. cls._with_register_classes(cls.setup_classes)
  229. @classmethod
  230. def _setup_once_mappers(cls):
  231. if cls.run_setup_mappers == 'once':
  232. cls._with_register_classes(cls.setup_mappers)
  233. def _setup_each_mappers(self):
  234. if self.run_setup_mappers == 'each':
  235. self._with_register_classes(self.setup_mappers)
  236. def _setup_each_classes(self):
  237. if self.run_setup_classes == 'each':
  238. self._with_register_classes(self.setup_classes)
  239. @classmethod
  240. def _with_register_classes(cls, fn):
  241. """Run a setup method, framing the operation with a Base class
  242. that will catch new subclasses to be established within
  243. the "classes" registry.
  244. """
  245. cls_registry = cls.classes
  246. class FindFixture(type):
  247. def __init__(cls, classname, bases, dict_):
  248. cls_registry[classname] = cls
  249. return type.__init__(cls, classname, bases, dict_)
  250. class _Base(util.with_metaclass(FindFixture, object)):
  251. pass
  252. class Basic(BasicEntity, _Base):
  253. pass
  254. class Comparable(ComparableEntity, _Base):
  255. pass
  256. cls.Basic = Basic
  257. cls.Comparable = Comparable
  258. fn()
  259. def _teardown_each_mappers(self):
  260. # some tests create mappers in the test bodies
  261. # and will define setup_mappers as None -
  262. # clear mappers in any case
  263. if self.run_setup_mappers != 'once':
  264. sa.orm.clear_mappers()
  265. def _teardown_each_classes(self):
  266. if self.run_setup_classes != 'once':
  267. self.classes.clear()
  268. @classmethod
  269. def setup_classes(cls):
  270. pass
  271. @classmethod
  272. def setup_mappers(cls):
  273. pass
  274. class DeclarativeMappedTest(MappedTest):
  275. run_setup_classes = 'once'
  276. run_setup_mappers = 'once'
  277. @classmethod
  278. def _setup_once_tables(cls):
  279. pass
  280. @classmethod
  281. def _with_register_classes(cls, fn):
  282. cls_registry = cls.classes
  283. class FindFixtureDeclarative(DeclarativeMeta):
  284. def __init__(cls, classname, bases, dict_):
  285. cls_registry[classname] = cls
  286. return DeclarativeMeta.__init__(
  287. cls, classname, bases, dict_)
  288. class DeclarativeBasic(object):
  289. __table_cls__ = schema.Table
  290. _DeclBase = declarative_base(metadata=cls.metadata,
  291. metaclass=FindFixtureDeclarative,
  292. cls=DeclarativeBasic)
  293. cls.DeclarativeBasic = _DeclBase
  294. fn()
  295. if cls.metadata.tables and cls.run_create_tables:
  296. cls.metadata.create_all(config.db)