batch.py 14 KB


  1. from sqlalchemy import Table, MetaData, Index, select, Column, \
  2. ForeignKeyConstraint, PrimaryKeyConstraint, cast, CheckConstraint
  3. from sqlalchemy import types as sqltypes
  4. from sqlalchemy import schema as sql_schema
  5. from sqlalchemy.util import OrderedDict
  6. from .. import util
  7. if util.sqla_08:
  8. from sqlalchemy.events import SchemaEventTarget
  9. from ..util.sqla_compat import _columns_for_constraint, \
  10. _is_type_bound, _fk_is_self_referential
  11. class BatchOperationsImpl(object):
  12. def __init__(self, operations, table_name, schema, recreate,
  13. copy_from, table_args, table_kwargs,
  14. reflect_args, reflect_kwargs, naming_convention):
  15. if not util.sqla_08:
  16. raise NotImplementedError(
  17. "batch mode requires SQLAlchemy 0.8 or greater.")
  18. self.operations = operations
  19. self.table_name = table_name
  20. self.schema = schema
  21. if recreate not in ('auto', 'always', 'never'):
  22. raise ValueError(
  23. "recreate may be one of 'auto', 'always', or 'never'.")
  24. self.recreate = recreate
  25. self.copy_from = copy_from
  26. self.table_args = table_args
  27. self.table_kwargs = dict(table_kwargs)
  28. self.reflect_args = reflect_args
  29. self.reflect_kwargs = reflect_kwargs
  30. self.naming_convention = naming_convention
  31. self.batch = []
  32. @property
  33. def dialect(self):
  34. return self.operations.impl.dialect
  35. @property
  36. def impl(self):
  37. return self.operations.impl
  38. def _should_recreate(self):
  39. if self.recreate == 'auto':
  40. return self.operations.impl.requires_recreate_in_batch(self)
  41. elif self.recreate == 'always':
  42. return True
  43. else:
  44. return False
  45. def flush(self):
  46. should_recreate = self._should_recreate()
  47. if not should_recreate:
  48. for opname, arg, kw in self.batch:
  49. fn = getattr(self.operations.impl, opname)
  50. fn(*arg, **kw)
  51. else:
  52. if self.naming_convention:
  53. m1 = MetaData(naming_convention=self.naming_convention)
  54. else:
  55. m1 = MetaData()
  56. if self.copy_from is not None:
  57. existing_table = self.copy_from
  58. reflected = False
  59. else:
  60. existing_table = Table(
  61. self.table_name, m1,
  62. schema=self.schema,
  63. autoload=True,
  64. autoload_with=self.operations.get_bind(),
  65. *self.reflect_args, **self.reflect_kwargs)
  66. reflected = True
  67. batch_impl = ApplyBatchImpl(
  68. existing_table, self.table_args, self.table_kwargs, reflected)
  69. for opname, arg, kw in self.batch:
  70. fn = getattr(batch_impl, opname)
  71. fn(*arg, **kw)
  72. batch_impl._create(self.impl)
  73. def alter_column(self, *arg, **kw):
  74. self.batch.append(("alter_column", arg, kw))
  75. def add_column(self, *arg, **kw):
  76. self.batch.append(("add_column", arg, kw))
  77. def drop_column(self, *arg, **kw):
  78. self.batch.append(("drop_column", arg, kw))
  79. def add_constraint(self, const):
  80. self.batch.append(("add_constraint", (const,), {}))
  81. def drop_constraint(self, const):
  82. self.batch.append(("drop_constraint", (const, ), {}))
  83. def rename_table(self, *arg, **kw):
  84. self.batch.append(("rename_table", arg, kw))
  85. def create_index(self, idx):
  86. self.batch.append(("create_index", (idx,), {}))
  87. def drop_index(self, idx):
  88. self.batch.append(("drop_index", (idx,), {}))
  89. def create_table(self, table):
  90. raise NotImplementedError("Can't create table in batch mode")
  91. def drop_table(self, table):
  92. raise NotImplementedError("Can't drop table in batch mode")
  93. class ApplyBatchImpl(object):
  94. def __init__(self, table, table_args, table_kwargs, reflected):
  95. self.table = table # this is a Table object
  96. self.table_args = table_args
  97. self.table_kwargs = table_kwargs
  98. self.new_table = None
  99. self.column_transfers = OrderedDict(
  100. (c.name, {'expr': c}) for c in self.table.c
  101. )
  102. self.reflected = reflected
  103. self._grab_table_elements()
  104. def _grab_table_elements(self):
  105. schema = self.table.schema
  106. self.columns = OrderedDict()
  107. for c in self.table.c:
  108. c_copy = c.copy(schema=schema)
  109. c_copy.unique = c_copy.index = False
  110. # ensure that the type object was copied,
  111. # as we may need to modify it in-place
  112. if isinstance(c.type, SchemaEventTarget):
  113. assert c_copy.type is not c.type
  114. self.columns[c.name] = c_copy
  115. self.named_constraints = {}
  116. self.unnamed_constraints = []
  117. self.indexes = {}
  118. self.new_indexes = {}
  119. for const in self.table.constraints:
  120. if _is_type_bound(const):
  121. continue
  122. elif self.reflected and isinstance(const, CheckConstraint):
  123. # TODO: we are skipping reflected CheckConstraint because
  124. # we have no way to determine _is_type_bound() for these.
  125. pass
  126. elif const.name:
  127. self.named_constraints[const.name] = const
  128. else:
  129. self.unnamed_constraints.append(const)
  130. for idx in self.table.indexes:
  131. self.indexes[idx.name] = idx
  132. for k in self.table.kwargs:
  133. self.table_kwargs.setdefault(k, self.table.kwargs[k])
  134. def _transfer_elements_to_new_table(self):
  135. assert self.new_table is None, "Can only create new table once"
  136. m = MetaData()
  137. schema = self.table.schema
  138. self.new_table = new_table = Table(
  139. '_alembic_batch_temp', m,
  140. *(list(self.columns.values()) + list(self.table_args)),
  141. schema=schema,
  142. **self.table_kwargs)
  143. for const in list(self.named_constraints.values()) + \
  144. self.unnamed_constraints:
  145. const_columns = set([
  146. c.key for c in _columns_for_constraint(const)])
  147. if not const_columns.issubset(self.column_transfers):
  148. continue
  149. if isinstance(const, ForeignKeyConstraint):
  150. if _fk_is_self_referential(const):
  151. # for self-referential constraint, refer to the
  152. # *original* table name, and not _alembic_batch_temp.
  153. # This is consistent with how we're handling
  154. # FK constraints from other tables; we assume SQLite
  155. # no foreign keys just keeps the names unchanged, so
  156. # when we rename back, they match again.
  157. const_copy = const.copy(
  158. schema=schema, target_table=self.table)
  159. else:
  160. # "target_table" for ForeignKeyConstraint.copy() is
  161. # only used if the FK is detected as being
  162. # self-referential, which we are handling above.
  163. const_copy = const.copy(schema=schema)
  164. else:
  165. const_copy = const.copy(schema=schema, target_table=new_table)
  166. if isinstance(const, ForeignKeyConstraint):
  167. self._setup_referent(m, const)
  168. new_table.append_constraint(const_copy)
  169. def _gather_indexes_from_both_tables(self):
  170. idx = []
  171. idx.extend(self.indexes.values())
  172. for index in self.new_indexes.values():
  173. idx.append(
  174. Index(
  175. index.name,
  176. unique=index.unique,
  177. *[self.new_table.c[col] for col in index.columns.keys()],
  178. **index.kwargs)
  179. )
  180. return idx
  181. def _setup_referent(self, metadata, constraint):
  182. spec = constraint.elements[0]._get_colspec()
  183. parts = spec.split(".")
  184. tname = parts[-2]
  185. if len(parts) == 3:
  186. referent_schema = parts[0]
  187. else:
  188. referent_schema = None
  189. if tname != '_alembic_batch_temp':
  190. key = sql_schema._get_table_key(tname, referent_schema)
  191. if key in metadata.tables:
  192. t = metadata.tables[key]
  193. for elem in constraint.elements:
  194. colname = elem._get_colspec().split(".")[-1]
  195. if not t.c.contains_column(colname):
  196. t.append_column(
  197. Column(colname, sqltypes.NULLTYPE)
  198. )
  199. else:
  200. Table(
  201. tname, metadata,
  202. *[Column(n, sqltypes.NULLTYPE) for n in
  203. [elem._get_colspec().split(".")[-1]
  204. for elem in constraint.elements]],
  205. schema=referent_schema)
  206. def _create(self, op_impl):
  207. self._transfer_elements_to_new_table()
  208. op_impl.prep_table_for_batch(self.table)
  209. op_impl.create_table(self.new_table)
  210. try:
  211. op_impl._exec(
  212. self.new_table.insert(inline=True).from_select(
  213. list(k for k, transfer in
  214. self.column_transfers.items() if 'expr' in transfer),
  215. select([
  216. transfer['expr']
  217. for transfer in self.column_transfers.values()
  218. if 'expr' in transfer
  219. ])
  220. )
  221. )
  222. op_impl.drop_table(self.table)
  223. except:
  224. op_impl.drop_table(self.new_table)
  225. raise
  226. else:
  227. op_impl.rename_table(
  228. "_alembic_batch_temp",
  229. self.table.name,
  230. schema=self.table.schema
  231. )
  232. self.new_table.name = self.table.name
  233. try:
  234. for idx in self._gather_indexes_from_both_tables():
  235. op_impl.create_index(idx)
  236. finally:
  237. self.new_table.name = "_alembic_batch_temp"
  238. def alter_column(self, table_name, column_name,
  239. nullable=None,
  240. server_default=False,
  241. name=None,
  242. type_=None,
  243. autoincrement=None,
  244. **kw
  245. ):
  246. existing = self.columns[column_name]
  247. existing_transfer = self.column_transfers[column_name]
  248. if name is not None and name != column_name:
  249. # note that we don't change '.key' - we keep referring
  250. # to the renamed column by its old key in _create(). neat!
  251. existing.name = name
  252. existing_transfer["name"] = name
  253. if type_ is not None:
  254. type_ = sqltypes.to_instance(type_)
  255. # old type is being discarded so turn off eventing
  256. # rules. Alternatively we can
  257. # erase the events set up by this type, but this is simpler.
  258. # we also ignore the drop_constraint that will come here from
  259. # Operations.implementation_for(alter_column)
  260. if isinstance(existing.type, SchemaEventTarget):
  261. existing.type._create_events = \
  262. existing.type.create_constraint = False
  263. if existing.type._type_affinity is not type_._type_affinity:
  264. existing_transfer["expr"] = cast(
  265. existing_transfer["expr"], type_)
  266. existing.type = type_
  267. # we *dont* however set events for the new type, because
  268. # alter_column is invoked from
  269. # Operations.implementation_for(alter_column) which already
  270. # will emit an add_constraint()
  271. if nullable is not None:
  272. existing.nullable = nullable
  273. if server_default is not False:
  274. if server_default is None:
  275. existing.server_default = None
  276. else:
  277. sql_schema.DefaultClause(server_default)._set_parent(existing)
  278. if autoincrement is not None:
  279. existing.autoincrement = bool(autoincrement)
  280. def add_column(self, table_name, column, **kw):
  281. # we copy the column because operations.add_column()
  282. # gives us a Column that is part of a Table already.
  283. self.columns[column.name] = column.copy(schema=self.table.schema)
  284. self.column_transfers[column.name] = {}
  285. def drop_column(self, table_name, column, **kw):
  286. del self.columns[column.name]
  287. del self.column_transfers[column.name]
  288. def add_constraint(self, const):
  289. if not const.name:
  290. raise ValueError("Constraint must have a name")
  291. if isinstance(const, sql_schema.PrimaryKeyConstraint):
  292. if self.table.primary_key in self.unnamed_constraints:
  293. self.unnamed_constraints.remove(self.table.primary_key)
  294. self.named_constraints[const.name] = const
  295. def drop_constraint(self, const):
  296. if not const.name:
  297. raise ValueError("Constraint must have a name")
  298. try:
  299. const = self.named_constraints.pop(const.name)
  300. except KeyError:
  301. if _is_type_bound(const):
  302. # type-bound constraints are only included in the new
  303. # table via their type object in any case, so ignore the
  304. # drop_constraint() that comes here via the
  305. # Operations.implementation_for(alter_column)
  306. return
  307. raise ValueError("No such constraint: '%s'" % const.name)
  308. else:
  309. if isinstance(const, PrimaryKeyConstraint):
  310. for col in const.columns:
  311. self.columns[col.name].primary_key = False
  312. def create_index(self, idx):
  313. self.new_indexes[idx.name] = idx
  314. def drop_index(self, idx):
  315. try:
  316. del self.indexes[idx.name]
  317. except KeyError:
  318. raise ValueError("No such index: '%s'" % idx.name)
  319. def rename_table(self, *arg, **kw):
  320. raise NotImplementedError("TODO")