123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- #!coding: utf-8
- import os
- import shutil
- import textwrap
- from ..util.compat import u
- from ..script import Script, ScriptDirectory
- from .. import util
- from . import engines
- from . import provision
- def _get_staging_directory():
- if provision.FOLLOWER_IDENT:
- return "scratch_%s" % provision.FOLLOWER_IDENT
- else:
- return 'scratch'
- def staging_env(create=True, template="generic", sourceless=False):
- from alembic import command, script
- cfg = _testing_config()
- if create:
- path = os.path.join(_get_staging_directory(), 'scripts')
- if os.path.exists(path):
- shutil.rmtree(path)
- command.init(cfg, path, template=template)
- if sourceless:
- try:
- # do an import so that a .pyc/.pyo is generated.
- util.load_python_file(path, 'env.py')
- except AttributeError:
- # we don't have the migration context set up yet
- # so running the .env py throws this exception.
- # theoretically we could be using py_compiler here to
- # generate .pyc/.pyo without importing but not really
- # worth it.
- pass
- make_sourceless(os.path.join(path, "env.py"))
- sc = script.ScriptDirectory.from_config(cfg)
- return sc
- def clear_staging_env():
- shutil.rmtree(_get_staging_directory(), True)
- def script_file_fixture(txt):
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- path = os.path.join(dir_, "script.py.mako")
- with open(path, 'w') as f:
- f.write(txt)
- def env_file_fixture(txt):
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- txt = """
- from alembic import context
- config = context.config
- """ + txt
- path = os.path.join(dir_, "env.py")
- pyc_path = util.pyc_file_from_path(path)
- if os.access(pyc_path, os.F_OK):
- os.unlink(pyc_path)
- with open(path, 'w') as f:
- f.write(txt)
- def _sqlite_file_db(tempname="foo.db"):
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- url = "sqlite:///%s/%s" % (dir_, tempname)
- return engines.testing_engine(url=url)
- def _sqlite_testing_config(sourceless=False):
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- url = "sqlite:///%s/foo.db" % dir_
- return _write_config_file("""
- [alembic]
- script_location = %s
- sqlalchemy.url = %s
- sourceless = %s
- [loggers]
- keys = root
- [handlers]
- keys = console
- [logger_root]
- level = WARN
- handlers = console
- qualname =
- [handler_console]
- class = StreamHandler
- args = (sys.stderr,)
- level = NOTSET
- formatter = generic
- [formatters]
- keys = generic
- [formatter_generic]
- format = %%(levelname)-5.5s [%%(name)s] %%(message)s
- datefmt = %%H:%%M:%%S
- """ % (dir_, url, "true" if sourceless else "false"))
- def _multi_dir_testing_config(sourceless=False, extra_version_location=''):
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- url = "sqlite:///%s/foo.db" % dir_
- return _write_config_file("""
- [alembic]
- script_location = %s
- sqlalchemy.url = %s
- sourceless = %s
- version_locations = %%(here)s/model1/ %%(here)s/model2/ %%(here)s/model3/ %s
- [loggers]
- keys = root
- [handlers]
- keys = console
- [logger_root]
- level = WARN
- handlers = console
- qualname =
- [handler_console]
- class = StreamHandler
- args = (sys.stderr,)
- level = NOTSET
- formatter = generic
- [formatters]
- keys = generic
- [formatter_generic]
- format = %%(levelname)-5.5s [%%(name)s] %%(message)s
- datefmt = %%H:%%M:%%S
- """ % (dir_, url, "true" if sourceless else "false",
- extra_version_location))
- def _no_sql_testing_config(dialect="postgresql", directives=""):
- """use a postgresql url with no host so that
- connections guaranteed to fail"""
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- return _write_config_file("""
- [alembic]
- script_location = %s
- sqlalchemy.url = %s://
- %s
- [loggers]
- keys = root
- [handlers]
- keys = console
- [logger_root]
- level = WARN
- handlers = console
- qualname =
- [handler_console]
- class = StreamHandler
- args = (sys.stderr,)
- level = NOTSET
- formatter = generic
- [formatters]
- keys = generic
- [formatter_generic]
- format = %%(levelname)-5.5s [%%(name)s] %%(message)s
- datefmt = %%H:%%M:%%S
- """ % (dir_, dialect, directives))
- def _write_config_file(text):
- cfg = _testing_config()
- with open(cfg.config_file_name, 'w') as f:
- f.write(text)
- return cfg
- def _testing_config():
- from alembic.config import Config
- if not os.access(_get_staging_directory(), os.F_OK):
- os.mkdir(_get_staging_directory())
- return Config(os.path.join(_get_staging_directory(), 'test_alembic.ini'))
- def write_script(
- scriptdir, rev_id, content, encoding='ascii', sourceless=False):
- old = scriptdir.revision_map.get_revision(rev_id)
- path = old.path
- content = textwrap.dedent(content)
- if encoding:
- content = content.encode(encoding)
- with open(path, 'wb') as fp:
- fp.write(content)
- pyc_path = util.pyc_file_from_path(path)
- if os.access(pyc_path, os.F_OK):
- os.unlink(pyc_path)
- script = Script._from_path(scriptdir, path)
- old = scriptdir.revision_map.get_revision(script.revision)
- if old.down_revision != script.down_revision:
- raise Exception("Can't change down_revision "
- "on a refresh operation.")
- scriptdir.revision_map.add_revision(script, _replace=True)
- if sourceless:
- make_sourceless(path)
- def make_sourceless(path):
- # note that if -O is set, you'd see pyo files here,
- # the pyc util function looks at sys.flags.optimize to handle this
- pyc_path = util.pyc_file_from_path(path)
- assert os.access(pyc_path, os.F_OK)
- # look for a non-pep3147 path here.
- # if not present, need to copy from __pycache__
- simple_pyc_path = util.simple_pyc_file_from_path(path)
- if not os.access(simple_pyc_path, os.F_OK):
- shutil.copyfile(pyc_path, simple_pyc_path)
- os.unlink(path)
- def three_rev_fixture(cfg):
- a = util.rev_id()
- b = util.rev_id()
- c = util.rev_id()
- script = ScriptDirectory.from_config(cfg)
- script.generate_revision(a, "revision a", refresh=True)
- write_script(script, a, """\
- "Rev A"
- revision = '%s'
- down_revision = None
- from alembic import op
- def upgrade():
- op.execute("CREATE STEP 1")
- def downgrade():
- op.execute("DROP STEP 1")
- """ % a)
- script.generate_revision(b, "revision b", refresh=True)
- write_script(script, b, u("""# coding: utf-8
- "Rev B, méil"
- revision = '%s'
- down_revision = '%s'
- from alembic import op
- def upgrade():
- op.execute("CREATE STEP 2")
- def downgrade():
- op.execute("DROP STEP 2")
- """) % (b, a), encoding="utf-8")
- script.generate_revision(c, "revision c", refresh=True)
- write_script(script, c, """\
- "Rev C"
- revision = '%s'
- down_revision = '%s'
- from alembic import op
- def upgrade():
- op.execute("CREATE STEP 3")
- def downgrade():
- op.execute("DROP STEP 3")
- """ % (c, b))
- return a, b, c
- def _multidb_testing_config(engines):
- """alembic.ini fixture to work exactly with the 'multidb' template"""
- dir_ = os.path.join(_get_staging_directory(), 'scripts')
- databases = ", ".join(
- engines.keys()
- )
- engines = "\n\n".join(
- "[%s]\n"
- "sqlalchemy.url = %s" % (key, value.url)
- for key, value in engines.items()
- )
- return _write_config_file("""
- [alembic]
- script_location = %s
- sourceless = false
- databases = %s
- %s
- [loggers]
- keys = root
- [handlers]
- keys = console
- [logger_root]
- level = WARN
- handlers = console
- qualname =
- [handler_console]
- class = StreamHandler
- args = (sys.stderr,)
- level = NOTSET
- formatter = generic
- [formatters]
- keys = generic
- [formatter_generic]
- format = %%(levelname)-5.5s [%%(name)s] %%(message)s
- datefmt = %%H:%%M:%%S
- """ % (dir_, databases, engines)
- )
|