env.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. #!coding: utf-8
  2. import os
  3. import shutil
  4. import textwrap
  5. from ..util.compat import u
  6. from ..script import Script, ScriptDirectory
  7. from .. import util
  8. from . import engines
  9. from . import provision
  10. def _get_staging_directory():
  11. if provision.FOLLOWER_IDENT:
  12. return "scratch_%s" % provision.FOLLOWER_IDENT
  13. else:
  14. return 'scratch'
  15. def staging_env(create=True, template="generic", sourceless=False):
  16. from alembic import command, script
  17. cfg = _testing_config()
  18. if create:
  19. path = os.path.join(_get_staging_directory(), 'scripts')
  20. if os.path.exists(path):
  21. shutil.rmtree(path)
  22. command.init(cfg, path, template=template)
  23. if sourceless:
  24. try:
  25. # do an import so that a .pyc/.pyo is generated.
  26. util.load_python_file(path, 'env.py')
  27. except AttributeError:
  28. # we don't have the migration context set up yet
  29. # so running the .env py throws this exception.
  30. # theoretically we could be using py_compiler here to
  31. # generate .pyc/.pyo without importing but not really
  32. # worth it.
  33. pass
  34. make_sourceless(os.path.join(path, "env.py"))
  35. sc = script.ScriptDirectory.from_config(cfg)
  36. return sc
  37. def clear_staging_env():
  38. shutil.rmtree(_get_staging_directory(), True)
  39. def script_file_fixture(txt):
  40. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  41. path = os.path.join(dir_, "script.py.mako")
  42. with open(path, 'w') as f:
  43. f.write(txt)
  44. def env_file_fixture(txt):
  45. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  46. txt = """
  47. from alembic import context
  48. config = context.config
  49. """ + txt
  50. path = os.path.join(dir_, "env.py")
  51. pyc_path = util.pyc_file_from_path(path)
  52. if os.access(pyc_path, os.F_OK):
  53. os.unlink(pyc_path)
  54. with open(path, 'w') as f:
  55. f.write(txt)
  56. def _sqlite_file_db(tempname="foo.db"):
  57. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  58. url = "sqlite:///%s/%s" % (dir_, tempname)
  59. return engines.testing_engine(url=url)
  60. def _sqlite_testing_config(sourceless=False):
  61. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  62. url = "sqlite:///%s/foo.db" % dir_
  63. return _write_config_file("""
  64. [alembic]
  65. script_location = %s
  66. sqlalchemy.url = %s
  67. sourceless = %s
  68. [loggers]
  69. keys = root
  70. [handlers]
  71. keys = console
  72. [logger_root]
  73. level = WARN
  74. handlers = console
  75. qualname =
  76. [handler_console]
  77. class = StreamHandler
  78. args = (sys.stderr,)
  79. level = NOTSET
  80. formatter = generic
  81. [formatters]
  82. keys = generic
  83. [formatter_generic]
  84. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  85. datefmt = %%H:%%M:%%S
  86. """ % (dir_, url, "true" if sourceless else "false"))
  87. def _multi_dir_testing_config(sourceless=False, extra_version_location=''):
  88. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  89. url = "sqlite:///%s/foo.db" % dir_
  90. return _write_config_file("""
  91. [alembic]
  92. script_location = %s
  93. sqlalchemy.url = %s
  94. sourceless = %s
  95. version_locations = %%(here)s/model1/ %%(here)s/model2/ %%(here)s/model3/ %s
  96. [loggers]
  97. keys = root
  98. [handlers]
  99. keys = console
  100. [logger_root]
  101. level = WARN
  102. handlers = console
  103. qualname =
  104. [handler_console]
  105. class = StreamHandler
  106. args = (sys.stderr,)
  107. level = NOTSET
  108. formatter = generic
  109. [formatters]
  110. keys = generic
  111. [formatter_generic]
  112. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  113. datefmt = %%H:%%M:%%S
  114. """ % (dir_, url, "true" if sourceless else "false",
  115. extra_version_location))
  116. def _no_sql_testing_config(dialect="postgresql", directives=""):
  117. """use a postgresql url with no host so that
  118. connections guaranteed to fail"""
  119. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  120. return _write_config_file("""
  121. [alembic]
  122. script_location = %s
  123. sqlalchemy.url = %s://
  124. %s
  125. [loggers]
  126. keys = root
  127. [handlers]
  128. keys = console
  129. [logger_root]
  130. level = WARN
  131. handlers = console
  132. qualname =
  133. [handler_console]
  134. class = StreamHandler
  135. args = (sys.stderr,)
  136. level = NOTSET
  137. formatter = generic
  138. [formatters]
  139. keys = generic
  140. [formatter_generic]
  141. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  142. datefmt = %%H:%%M:%%S
  143. """ % (dir_, dialect, directives))
  144. def _write_config_file(text):
  145. cfg = _testing_config()
  146. with open(cfg.config_file_name, 'w') as f:
  147. f.write(text)
  148. return cfg
  149. def _testing_config():
  150. from alembic.config import Config
  151. if not os.access(_get_staging_directory(), os.F_OK):
  152. os.mkdir(_get_staging_directory())
  153. return Config(os.path.join(_get_staging_directory(), 'test_alembic.ini'))
  154. def write_script(
  155. scriptdir, rev_id, content, encoding='ascii', sourceless=False):
  156. old = scriptdir.revision_map.get_revision(rev_id)
  157. path = old.path
  158. content = textwrap.dedent(content)
  159. if encoding:
  160. content = content.encode(encoding)
  161. with open(path, 'wb') as fp:
  162. fp.write(content)
  163. pyc_path = util.pyc_file_from_path(path)
  164. if os.access(pyc_path, os.F_OK):
  165. os.unlink(pyc_path)
  166. script = Script._from_path(scriptdir, path)
  167. old = scriptdir.revision_map.get_revision(script.revision)
  168. if old.down_revision != script.down_revision:
  169. raise Exception("Can't change down_revision "
  170. "on a refresh operation.")
  171. scriptdir.revision_map.add_revision(script, _replace=True)
  172. if sourceless:
  173. make_sourceless(path)
  174. def make_sourceless(path):
  175. # note that if -O is set, you'd see pyo files here,
  176. # the pyc util function looks at sys.flags.optimize to handle this
  177. pyc_path = util.pyc_file_from_path(path)
  178. assert os.access(pyc_path, os.F_OK)
  179. # look for a non-pep3147 path here.
  180. # if not present, need to copy from __pycache__
  181. simple_pyc_path = util.simple_pyc_file_from_path(path)
  182. if not os.access(simple_pyc_path, os.F_OK):
  183. shutil.copyfile(pyc_path, simple_pyc_path)
  184. os.unlink(path)
  185. def three_rev_fixture(cfg):
  186. a = util.rev_id()
  187. b = util.rev_id()
  188. c = util.rev_id()
  189. script = ScriptDirectory.from_config(cfg)
  190. script.generate_revision(a, "revision a", refresh=True)
  191. write_script(script, a, """\
  192. "Rev A"
  193. revision = '%s'
  194. down_revision = None
  195. from alembic import op
  196. def upgrade():
  197. op.execute("CREATE STEP 1")
  198. def downgrade():
  199. op.execute("DROP STEP 1")
  200. """ % a)
  201. script.generate_revision(b, "revision b", refresh=True)
  202. write_script(script, b, u("""# coding: utf-8
  203. "Rev B, méil"
  204. revision = '%s'
  205. down_revision = '%s'
  206. from alembic import op
  207. def upgrade():
  208. op.execute("CREATE STEP 2")
  209. def downgrade():
  210. op.execute("DROP STEP 2")
  211. """) % (b, a), encoding="utf-8")
  212. script.generate_revision(c, "revision c", refresh=True)
  213. write_script(script, c, """\
  214. "Rev C"
  215. revision = '%s'
  216. down_revision = '%s'
  217. from alembic import op
  218. def upgrade():
  219. op.execute("CREATE STEP 3")
  220. def downgrade():
  221. op.execute("DROP STEP 3")
  222. """ % (c, b))
  223. return a, b, c
  224. def _multidb_testing_config(engines):
  225. """alembic.ini fixture to work exactly with the 'multidb' template"""
  226. dir_ = os.path.join(_get_staging_directory(), 'scripts')
  227. databases = ", ".join(
  228. engines.keys()
  229. )
  230. engines = "\n\n".join(
  231. "[%s]\n"
  232. "sqlalchemy.url = %s" % (key, value.url)
  233. for key, value in engines.items()
  234. )
  235. return _write_config_file("""
  236. [alembic]
  237. script_location = %s
  238. sourceless = false
  239. databases = %s
  240. %s
  241. [loggers]
  242. keys = root
  243. [handlers]
  244. keys = console
  245. [logger_root]
  246. level = WARN
  247. handlers = console
  248. qualname =
  249. [handler_console]
  250. class = StreamHandler
  251. args = (sys.stderr,)
  252. level = NOTSET
  253. formatter = generic
  254. [formatters]
  255. keys = generic
  256. [formatter_generic]
  257. format = %%(levelname)-5.5s [%%(name)s] %%(message)s
  258. datefmt = %%H:%%M:%%S
  259. """ % (dir_, databases, engines)
  260. )