replay_fixture.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. from . import fixtures
  2. from . import profiling
  3. from .. import util
  4. import types
  5. from collections import deque
  6. import contextlib
  7. from . import config
  8. from sqlalchemy import MetaData
  9. from sqlalchemy import create_engine
  10. from sqlalchemy.orm import Session
  11. class ReplayFixtureTest(fixtures.TestBase):
  12. @contextlib.contextmanager
  13. def _dummy_ctx(self, *arg, **kw):
  14. yield
  15. def test_invocation(self):
  16. dbapi_session = ReplayableSession()
  17. creator = config.db.pool._creator
  18. recorder = lambda: dbapi_session.recorder(creator())
  19. engine = create_engine(
  20. config.db.url, creator=recorder,
  21. use_native_hstore=False)
  22. self.metadata = MetaData(engine)
  23. self.engine = engine
  24. self.session = Session(engine)
  25. self.setup_engine()
  26. try:
  27. self._run_steps(ctx=self._dummy_ctx)
  28. finally:
  29. self.teardown_engine()
  30. engine.dispose()
  31. player = lambda: dbapi_session.player()
  32. engine = create_engine(
  33. config.db.url, creator=player,
  34. use_native_hstore=False)
  35. self.metadata = MetaData(engine)
  36. self.engine = engine
  37. self.session = Session(engine)
  38. self.setup_engine()
  39. try:
  40. self._run_steps(ctx=profiling.count_functions)
  41. finally:
  42. self.session.close()
  43. engine.dispose()
  44. def setup_engine(self):
  45. pass
  46. def teardown_engine(self):
  47. pass
  48. def _run_steps(self, ctx):
  49. raise NotImplementedError()
  50. class ReplayableSession(object):
  51. """A simple record/playback tool.
  52. This is *not* a mock testing class. It only records a session for later
  53. playback and makes no assertions on call consistency whatsoever. It's
  54. unlikely to be suitable for anything other than DB-API recording.
  55. """
  56. Callable = object()
  57. NoAttribute = object()
  58. if util.py2k:
  59. Natives = set([getattr(types, t)
  60. for t in dir(types) if not t.startswith('_')]).\
  61. difference([getattr(types, t)
  62. for t in ('FunctionType', 'BuiltinFunctionType',
  63. 'MethodType', 'BuiltinMethodType',
  64. 'LambdaType', 'UnboundMethodType',)])
  65. else:
  66. Natives = set([getattr(types, t)
  67. for t in dir(types) if not t.startswith('_')]).\
  68. union([type(t) if not isinstance(t, type)
  69. else t for t in __builtins__.values()]).\
  70. difference([getattr(types, t)
  71. for t in ('FunctionType', 'BuiltinFunctionType',
  72. 'MethodType', 'BuiltinMethodType',
  73. 'LambdaType', )])
  74. def __init__(self):
  75. self.buffer = deque()
  76. def recorder(self, base):
  77. return self.Recorder(self.buffer, base)
  78. def player(self):
  79. return self.Player(self.buffer)
  80. class Recorder(object):
  81. def __init__(self, buffer, subject):
  82. self._buffer = buffer
  83. self._subject = subject
  84. def __call__(self, *args, **kw):
  85. subject, buffer = [object.__getattribute__(self, x)
  86. for x in ('_subject', '_buffer')]
  87. result = subject(*args, **kw)
  88. if type(result) not in ReplayableSession.Natives:
  89. buffer.append(ReplayableSession.Callable)
  90. return type(self)(buffer, result)
  91. else:
  92. buffer.append(result)
  93. return result
  94. @property
  95. def _sqla_unwrap(self):
  96. return self._subject
  97. def __getattribute__(self, key):
  98. try:
  99. return object.__getattribute__(self, key)
  100. except AttributeError:
  101. pass
  102. subject, buffer = [object.__getattribute__(self, x)
  103. for x in ('_subject', '_buffer')]
  104. try:
  105. result = type(subject).__getattribute__(subject, key)
  106. except AttributeError:
  107. buffer.append(ReplayableSession.NoAttribute)
  108. raise
  109. else:
  110. if type(result) not in ReplayableSession.Natives:
  111. buffer.append(ReplayableSession.Callable)
  112. return type(self)(buffer, result)
  113. else:
  114. buffer.append(result)
  115. return result
  116. class Player(object):
  117. def __init__(self, buffer):
  118. self._buffer = buffer
  119. def __call__(self, *args, **kw):
  120. buffer = object.__getattribute__(self, '_buffer')
  121. result = buffer.popleft()
  122. if result is ReplayableSession.Callable:
  123. return self
  124. else:
  125. return result
  126. @property
  127. def _sqla_unwrap(self):
  128. return None
  129. def __getattribute__(self, key):
  130. try:
  131. return object.__getattribute__(self, key)
  132. except AttributeError:
  133. pass
  134. buffer = object.__getattribute__(self, '_buffer')
  135. result = buffer.popleft()
  136. if result is ReplayableSession.Callable:
  137. return self
  138. elif result is ReplayableSession.NoAttribute:
  139. raise AttributeError(key)
  140. else:
  141. return result