provision.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. """NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
  2. this should be removable when Alembic targets SQLAlchemy 1.0.0
  3. """
  4. from sqlalchemy.engine import url as sa_url
  5. from sqlalchemy import text
  6. from sqlalchemy import exc
  7. from ..util import compat
  8. from . import config, engines
  9. from .compat import get_url_backend_name
  10. import os
  11. import time
  12. import logging
  13. log = logging.getLogger(__name__)
  14. FOLLOWER_IDENT = None
  15. class register(object):
  16. def __init__(self):
  17. self.fns = {}
  18. @classmethod
  19. def init(cls, fn):
  20. return register().for_db("*")(fn)
  21. def for_db(self, dbname):
  22. def decorate(fn):
  23. self.fns[dbname] = fn
  24. return self
  25. return decorate
  26. def __call__(self, cfg, *arg):
  27. if isinstance(cfg, compat.string_types):
  28. url = sa_url.make_url(cfg)
  29. elif isinstance(cfg, sa_url.URL):
  30. url = cfg
  31. else:
  32. url = cfg.db.url
  33. backend = get_url_backend_name(url)
  34. if backend in self.fns:
  35. return self.fns[backend](cfg, *arg)
  36. else:
  37. return self.fns['*'](cfg, *arg)
  38. def create_follower_db(follower_ident):
  39. for cfg in _configs_for_db_operation():
  40. _create_db(cfg, cfg.db, follower_ident)
  41. def configure_follower(follower_ident):
  42. for cfg in config.Config.all_configs():
  43. _configure_follower(cfg, follower_ident)
  44. def setup_config(db_url, options, file_config, follower_ident):
  45. if follower_ident:
  46. db_url = _follower_url_from_main(db_url, follower_ident)
  47. db_opts = {}
  48. _update_db_opts(db_url, db_opts)
  49. eng = engines.testing_engine(db_url, db_opts)
  50. _post_configure_engine(db_url, eng, follower_ident)
  51. eng.connect().close()
  52. cfg = config.Config.register(eng, db_opts, options, file_config)
  53. if follower_ident:
  54. _configure_follower(cfg, follower_ident)
  55. return cfg
  56. def drop_follower_db(follower_ident):
  57. for cfg in _configs_for_db_operation():
  58. _drop_db(cfg, cfg.db, follower_ident)
  59. def _configs_for_db_operation():
  60. hosts = set()
  61. for cfg in config.Config.all_configs():
  62. cfg.db.dispose()
  63. for cfg in config.Config.all_configs():
  64. url = cfg.db.url
  65. backend = get_url_backend_name(url)
  66. host_conf = (
  67. backend,
  68. url.username, url.host, url.database)
  69. if host_conf not in hosts:
  70. yield cfg
  71. hosts.add(host_conf)
  72. for cfg in config.Config.all_configs():
  73. cfg.db.dispose()
  74. @register.init
  75. def _create_db(cfg, eng, ident):
  76. raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)
  77. @register.init
  78. def _drop_db(cfg, eng, ident):
  79. raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
  80. @register.init
  81. def _update_db_opts(db_url, db_opts):
  82. pass
  83. @register.init
  84. def _configure_follower(cfg, ident):
  85. pass
  86. @register.init
  87. def _post_configure_engine(url, engine, follower_ident):
  88. pass
  89. @register.init
  90. def _follower_url_from_main(url, ident):
  91. url = sa_url.make_url(url)
  92. url.database = ident
  93. return url
  94. @_update_db_opts.for_db("mssql")
  95. def _mssql_update_db_opts(db_url, db_opts):
  96. db_opts['legacy_schema_aliasing'] = False
  97. @_follower_url_from_main.for_db("sqlite")
  98. def _sqlite_follower_url_from_main(url, ident):
  99. url = sa_url.make_url(url)
  100. if not url.database or url.database == ':memory:':
  101. return url
  102. else:
  103. return sa_url.make_url("sqlite:///%s.db" % ident)
  104. @_post_configure_engine.for_db("sqlite")
  105. def _sqlite_post_configure_engine(url, engine, follower_ident):
  106. from sqlalchemy import event
  107. @event.listens_for(engine, "connect")
  108. def connect(dbapi_connection, connection_record):
  109. # use file DBs in all cases, memory acts kind of strangely
  110. # as an attached
  111. if not follower_ident:
  112. dbapi_connection.execute(
  113. 'ATTACH DATABASE "test_schema.db" AS test_schema')
  114. else:
  115. dbapi_connection.execute(
  116. 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
  117. % follower_ident)
  118. @_create_db.for_db("postgresql")
  119. def _pg_create_db(cfg, eng, ident):
  120. with eng.connect().execution_options(
  121. isolation_level="AUTOCOMMIT") as conn:
  122. try:
  123. _pg_drop_db(cfg, conn, ident)
  124. except Exception:
  125. pass
  126. currentdb = conn.scalar("select current_database()")
  127. for attempt in range(3):
  128. try:
  129. conn.execute(
  130. "CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
  131. except exc.OperationalError as err:
  132. if attempt != 2 and "accessed by other users" in str(err):
  133. time.sleep(.2)
  134. continue
  135. else:
  136. raise
  137. else:
  138. break
  139. @_create_db.for_db("mysql")
  140. def _mysql_create_db(cfg, eng, ident):
  141. with eng.connect() as conn:
  142. try:
  143. _mysql_drop_db(cfg, conn, ident)
  144. except Exception:
  145. pass
  146. conn.execute("CREATE DATABASE %s" % ident)
  147. conn.execute("CREATE DATABASE %s_test_schema" % ident)
  148. conn.execute("CREATE DATABASE %s_test_schema_2" % ident)
  149. @_configure_follower.for_db("mysql")
  150. def _mysql_configure_follower(config, ident):
  151. config.test_schema = "%s_test_schema" % ident
  152. config.test_schema_2 = "%s_test_schema_2" % ident
  153. @_create_db.for_db("sqlite")
  154. def _sqlite_create_db(cfg, eng, ident):
  155. pass
  156. @_drop_db.for_db("postgresql")
  157. def _pg_drop_db(cfg, eng, ident):
  158. with eng.connect().execution_options(
  159. isolation_level="AUTOCOMMIT") as conn:
  160. conn.execute(
  161. text(
  162. "select pg_terminate_backend(pid) from pg_stat_activity "
  163. "where usename=current_user and pid != pg_backend_pid() "
  164. "and datname=:dname"
  165. ), dname=ident)
  166. conn.execute("DROP DATABASE %s" % ident)
  167. @_drop_db.for_db("sqlite")
  168. def _sqlite_drop_db(cfg, eng, ident):
  169. if ident:
  170. os.remove("%s_test_schema.db" % ident)
  171. else:
  172. os.remove("%s.db" % ident)
  173. @_drop_db.for_db("mysql")
  174. def _mysql_drop_db(cfg, eng, ident):
  175. with eng.connect() as conn:
  176. conn.execute("DROP DATABASE %s_test_schema" % ident)
  177. conn.execute("DROP DATABASE %s_test_schema_2" % ident)
  178. conn.execute("DROP DATABASE %s" % ident)
  179. @_create_db.for_db("oracle")
  180. def _oracle_create_db(cfg, eng, ident):
  181. # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
  182. # similar, so that the default tablespace is not "system"; reflection will
  183. # fail otherwise
  184. with eng.connect() as conn:
  185. conn.execute("create user %s identified by xe" % ident)
  186. conn.execute("create user %s_ts1 identified by xe" % ident)
  187. conn.execute("create user %s_ts2 identified by xe" % ident)
  188. conn.execute("grant dba to %s" % (ident, ))
  189. conn.execute("grant unlimited tablespace to %s" % ident)
  190. conn.execute("grant unlimited tablespace to %s_ts1" % ident)
  191. conn.execute("grant unlimited tablespace to %s_ts2" % ident)
  192. @_configure_follower.for_db("oracle")
  193. def _oracle_configure_follower(config, ident):
  194. config.test_schema = "%s_ts1" % ident
  195. config.test_schema_2 = "%s_ts2" % ident
  196. def _ora_drop_ignore(conn, dbname):
  197. try:
  198. conn.execute("drop user %s cascade" % dbname)
  199. log.info("Reaped db: %s" % dbname)
  200. return True
  201. except exc.DatabaseError as err:
  202. log.warn("couldn't drop db: %s" % err)
  203. return False
  204. @_drop_db.for_db("oracle")
  205. def _oracle_drop_db(cfg, eng, ident):
  206. with eng.connect() as conn:
  207. # cx_Oracle seems to occasionally leak open connections when a large
  208. # suite it run, even if we confirm we have zero references to
  209. # connection objects.
  210. # while there is a "kill session" command in Oracle,
  211. # it unfortunately does not release the connection sufficiently.
  212. _ora_drop_ignore(conn, ident)
  213. _ora_drop_ignore(conn, "%s_ts1" % ident)
  214. _ora_drop_ignore(conn, "%s_ts2" % ident)
  215. def reap_oracle_dbs(eng, idents_file):
  216. log.info("Reaping Oracle dbs...")
  217. with eng.connect() as conn:
  218. with open(idents_file) as file_:
  219. idents = set(line.strip() for line in file_)
  220. log.info("identifiers in file: %s", ", ".join(idents))
  221. to_reap = conn.execute(
  222. "select u.username from all_users u where username "
  223. "like 'TEST_%' and not exists (select username "
  224. "from v$session where username=u.username)")
  225. all_names = set([username.lower() for (username, ) in to_reap])
  226. to_drop = set()
  227. for name in all_names:
  228. if name.endswith("_ts1") or name.endswith("_ts2"):
  229. continue
  230. elif name in idents:
  231. to_drop.add(name)
  232. if "%s_ts1" % name in all_names:
  233. to_drop.add("%s_ts1" % name)
  234. if "%s_ts2" % name in all_names:
  235. to_drop.add("%s_ts2" % name)
  236. dropped = total = 0
  237. for total, username in enumerate(to_drop, 1):
  238. if _ora_drop_ignore(conn, username):
  239. dropped += 1
  240. log.info(
  241. "Dropped %d out of %d stale databases detected", dropped, total)
  242. @_follower_url_from_main.for_db("oracle")
  243. def _oracle_follower_url_from_main(url, ident):
  244. url = sa_url.make_url(url)
  245. url.username = ident
  246. url.password = 'xe'
  247. return url