assertsql.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. # testing/assertsql.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. from ..engine.default import DefaultDialect
  8. from .. import util
  9. import re
  10. import collections
  11. import contextlib
  12. from .. import event
  13. from sqlalchemy.schema import _DDLCompiles
  14. from sqlalchemy.engine.util import _distill_params
  15. from sqlalchemy.engine import url
  16. class AssertRule(object):
  17. is_consumed = False
  18. errormessage = None
  19. consume_statement = True
  20. def process_statement(self, execute_observed):
  21. pass
  22. def no_more_statements(self):
  23. assert False, 'All statements are complete, but pending '\
  24. 'assertion rules remain'
  25. class SQLMatchRule(AssertRule):
  26. pass
  27. class CursorSQL(SQLMatchRule):
  28. consume_statement = False
  29. def __init__(self, statement, params=None):
  30. self.statement = statement
  31. self.params = params
  32. def process_statement(self, execute_observed):
  33. stmt = execute_observed.statements[0]
  34. if self.statement != stmt.statement or (
  35. self.params is not None and self.params != stmt.parameters):
  36. self.errormessage = \
  37. "Testing for exact SQL %s parameters %s received %s %s" % (
  38. self.statement, self.params,
  39. stmt.statement, stmt.parameters
  40. )
  41. else:
  42. execute_observed.statements.pop(0)
  43. self.is_consumed = True
  44. if not execute_observed.statements:
  45. self.consume_statement = True
  46. class CompiledSQL(SQLMatchRule):
  47. def __init__(self, statement, params=None, dialect='default'):
  48. self.statement = statement
  49. self.params = params
  50. self.dialect = dialect
  51. def _compare_sql(self, execute_observed, received_statement):
  52. stmt = re.sub(r'[\n\t]', '', self.statement)
  53. return received_statement == stmt
  54. def _compile_dialect(self, execute_observed):
  55. if self.dialect == 'default':
  56. return DefaultDialect()
  57. else:
  58. # ugh
  59. if self.dialect == 'postgresql':
  60. params = {'implicit_returning': True}
  61. else:
  62. params = {}
  63. return url.URL(self.dialect).get_dialect()(**params)
  64. def _received_statement(self, execute_observed):
  65. """reconstruct the statement and params in terms
  66. of a target dialect, which for CompiledSQL is just DefaultDialect."""
  67. context = execute_observed.context
  68. compare_dialect = self._compile_dialect(execute_observed)
  69. if isinstance(context.compiled.statement, _DDLCompiles):
  70. compiled = \
  71. context.compiled.statement.compile(
  72. dialect=compare_dialect,
  73. schema_translate_map=context.
  74. execution_options.get('schema_translate_map'))
  75. else:
  76. compiled = (
  77. context.compiled.statement.compile(
  78. dialect=compare_dialect,
  79. column_keys=context.compiled.column_keys,
  80. inline=context.compiled.inline,
  81. schema_translate_map=context.
  82. execution_options.get('schema_translate_map'))
  83. )
  84. _received_statement = re.sub(r'[\n\t]', '', util.text_type(compiled))
  85. parameters = execute_observed.parameters
  86. if not parameters:
  87. _received_parameters = [compiled.construct_params()]
  88. else:
  89. _received_parameters = [
  90. compiled.construct_params(m) for m in parameters]
  91. return _received_statement, _received_parameters
  92. def process_statement(self, execute_observed):
  93. context = execute_observed.context
  94. _received_statement, _received_parameters = \
  95. self._received_statement(execute_observed)
  96. params = self._all_params(context)
  97. equivalent = self._compare_sql(execute_observed, _received_statement)
  98. if equivalent:
  99. if params is not None:
  100. all_params = list(params)
  101. all_received = list(_received_parameters)
  102. while all_params and all_received:
  103. param = dict(all_params.pop(0))
  104. for idx, received in enumerate(list(all_received)):
  105. # do a positive compare only
  106. for param_key in param:
  107. # a key in param did not match current
  108. # 'received'
  109. if param_key not in received or \
  110. received[param_key] != param[param_key]:
  111. break
  112. else:
  113. # all keys in param matched 'received';
  114. # onto next param
  115. del all_received[idx]
  116. break
  117. else:
  118. # param did not match any entry
  119. # in all_received
  120. equivalent = False
  121. break
  122. if all_params or all_received:
  123. equivalent = False
  124. if equivalent:
  125. self.is_consumed = True
  126. self.errormessage = None
  127. else:
  128. self.errormessage = self._failure_message(params) % {
  129. 'received_statement': _received_statement,
  130. 'received_parameters': _received_parameters
  131. }
  132. def _all_params(self, context):
  133. if self.params:
  134. if util.callable(self.params):
  135. params = self.params(context)
  136. else:
  137. params = self.params
  138. if not isinstance(params, list):
  139. params = [params]
  140. return params
  141. else:
  142. return None
  143. def _failure_message(self, expected_params):
  144. return (
  145. 'Testing for compiled statement %r partial params %r, '
  146. 'received %%(received_statement)r with params '
  147. '%%(received_parameters)r' % (
  148. self.statement.replace('%', '%%'), expected_params
  149. )
  150. )
  151. class RegexSQL(CompiledSQL):
  152. def __init__(self, regex, params=None):
  153. SQLMatchRule.__init__(self)
  154. self.regex = re.compile(regex)
  155. self.orig_regex = regex
  156. self.params = params
  157. self.dialect = 'default'
  158. def _failure_message(self, expected_params):
  159. return (
  160. 'Testing for compiled statement ~%r partial params %r, '
  161. 'received %%(received_statement)r with params '
  162. '%%(received_parameters)r' % (
  163. self.orig_regex, expected_params
  164. )
  165. )
  166. def _compare_sql(self, execute_observed, received_statement):
  167. return bool(self.regex.match(received_statement))
  168. class DialectSQL(CompiledSQL):
  169. def _compile_dialect(self, execute_observed):
  170. return execute_observed.context.dialect
  171. def _compare_no_space(self, real_stmt, received_stmt):
  172. stmt = re.sub(r'[\n\t]', '', real_stmt)
  173. return received_stmt == stmt
  174. def _received_statement(self, execute_observed):
  175. received_stmt, received_params = super(DialectSQL, self).\
  176. _received_statement(execute_observed)
  177. # TODO: why do we need this part?
  178. for real_stmt in execute_observed.statements:
  179. if self._compare_no_space(real_stmt.statement, received_stmt):
  180. break
  181. else:
  182. raise AssertionError(
  183. "Can't locate compiled statement %r in list of "
  184. "statements actually invoked" % received_stmt)
  185. return received_stmt, execute_observed.context.compiled_parameters
  186. def _compare_sql(self, execute_observed, received_statement):
  187. stmt = re.sub(r'[\n\t]', '', self.statement)
  188. # convert our comparison statement to have the
  189. # paramstyle of the received
  190. paramstyle = execute_observed.context.dialect.paramstyle
  191. if paramstyle == 'pyformat':
  192. stmt = re.sub(
  193. r':([\w_]+)', r"%(\1)s", stmt)
  194. else:
  195. # positional params
  196. repl = None
  197. if paramstyle == 'qmark':
  198. repl = "?"
  199. elif paramstyle == 'format':
  200. repl = r"%s"
  201. elif paramstyle == 'numeric':
  202. repl = None
  203. stmt = re.sub(r':([\w_]+)', repl, stmt)
  204. return received_statement == stmt
  205. class CountStatements(AssertRule):
  206. def __init__(self, count):
  207. self.count = count
  208. self._statement_count = 0
  209. def process_statement(self, execute_observed):
  210. self._statement_count += 1
  211. def no_more_statements(self):
  212. if self.count != self._statement_count:
  213. assert False, 'desired statement count %d does not match %d' \
  214. % (self.count, self._statement_count)
  215. class AllOf(AssertRule):
  216. def __init__(self, *rules):
  217. self.rules = set(rules)
  218. def process_statement(self, execute_observed):
  219. for rule in list(self.rules):
  220. rule.errormessage = None
  221. rule.process_statement(execute_observed)
  222. if rule.is_consumed:
  223. self.rules.discard(rule)
  224. if not self.rules:
  225. self.is_consumed = True
  226. break
  227. elif not rule.errormessage:
  228. # rule is not done yet
  229. self.errormessage = None
  230. break
  231. else:
  232. self.errormessage = list(self.rules)[0].errormessage
  233. class Or(AllOf):
  234. def process_statement(self, execute_observed):
  235. for rule in self.rules:
  236. rule.process_statement(execute_observed)
  237. if rule.is_consumed:
  238. self.is_consumed = True
  239. break
  240. else:
  241. self.errormessage = list(self.rules)[0].errormessage
  242. class SQLExecuteObserved(object):
  243. def __init__(self, context, clauseelement, multiparams, params):
  244. self.context = context
  245. self.clauseelement = clauseelement
  246. self.parameters = _distill_params(multiparams, params)
  247. self.statements = []
  248. class SQLCursorExecuteObserved(
  249. collections.namedtuple(
  250. "SQLCursorExecuteObserved",
  251. ["statement", "parameters", "context", "executemany"])
  252. ):
  253. pass
  254. class SQLAsserter(object):
  255. def __init__(self):
  256. self.accumulated = []
  257. def _close(self):
  258. self._final = self.accumulated
  259. del self.accumulated
  260. def assert_(self, *rules):
  261. rules = list(rules)
  262. observed = list(self._final)
  263. while observed and rules:
  264. rule = rules[0]
  265. rule.process_statement(observed[0])
  266. if rule.is_consumed:
  267. rules.pop(0)
  268. elif rule.errormessage:
  269. assert False, rule.errormessage
  270. if rule.consume_statement:
  271. observed.pop(0)
  272. if not observed and rules:
  273. rules[0].no_more_statements()
  274. elif not rules and observed:
  275. assert False, "Additional SQL statements remain"
  276. @contextlib.contextmanager
  277. def assert_engine(engine):
  278. asserter = SQLAsserter()
  279. orig = []
  280. @event.listens_for(engine, "before_execute")
  281. def connection_execute(conn, clauseelement, multiparams, params):
  282. # grab the original statement + params before any cursor
  283. # execution
  284. orig[:] = clauseelement, multiparams, params
  285. @event.listens_for(engine, "after_cursor_execute")
  286. def cursor_execute(conn, cursor, statement, parameters,
  287. context, executemany):
  288. if not context:
  289. return
  290. # then grab real cursor statements and associate them all
  291. # around a single context
  292. if asserter.accumulated and \
  293. asserter.accumulated[-1].context is context:
  294. obs = asserter.accumulated[-1]
  295. else:
  296. obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
  297. asserter.accumulated.append(obs)
  298. obs.statements.append(
  299. SQLCursorExecuteObserved(
  300. statement, parameters, context, executemany)
  301. )
  302. try:
  303. yield asserter
  304. finally:
  305. event.remove(engine, "after_cursor_execute", cursor_execute)
  306. event.remove(engine, "before_execute", connection_execute)
  307. asserter._close()