rewriter.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from alembic import util
  2. from alembic.operations import ops
  3. class Rewriter(object):
  4. """A helper object that allows easy 'rewriting' of ops streams.
  5. The :class:`.Rewriter` object is intended to be passed along
  6. to the
  7. :paramref:`.EnvironmentContext.configure.process_revision_directives`
  8. parameter in an ``env.py`` script. Once constructed, any number
  9. of "rewrites" functions can be associated with it, which will be given
  10. the opportunity to modify the structure without having to have explicit
  11. knowledge of the overall structure.
  12. The function is passed the :class:`.MigrationContext` object and
  13. ``revision`` tuple that are passed to the :paramref:`.Environment
  14. Context.configure.process_revision_directives` function normally,
  15. and the third argument is an individual directive of the type
  16. noted in the decorator. The function has the choice of returning
  17. a single op directive, which normally can be the directive that
  18. was actually passed, or a new directive to replace it, or a list
  19. of zero or more directives to replace it.
  20. .. seealso::
  21. :ref:`autogen_rewriter` - usage example
  22. .. versionadded:: 0.8
  23. """
  24. _traverse = util.Dispatcher()
  25. _chained = None
  26. def __init__(self):
  27. self.dispatch = util.Dispatcher()
  28. def chain(self, other):
  29. """Produce a "chain" of this :class:`.Rewriter` to another.
  30. This allows two rewriters to operate serially on a stream,
  31. e.g.::
  32. writer1 = autogenerate.Rewriter()
  33. writer2 = autogenerate.Rewriter()
  34. @writer1.rewrites(ops.AddColumnOp)
  35. def add_column_nullable(context, revision, op):
  36. op.column.nullable = True
  37. return op
  38. @writer2.rewrites(ops.AddColumnOp)
  39. def add_column_idx(context, revision, op):
  40. idx_op = ops.CreateIndexOp(
  41. 'ixc', op.table_name, [op.column.name])
  42. return [
  43. op,
  44. idx_op
  45. ]
  46. writer = writer1.chain(writer2)
  47. :param other: a :class:`.Rewriter` instance
  48. :return: a new :class:`.Rewriter` that will run the operations
  49. of this writer, then the "other" writer, in succession.
  50. """
  51. wr = self.__class__.__new__(self.__class__)
  52. wr.__dict__.update(self.__dict__)
  53. wr._chained = other
  54. return wr
  55. def rewrites(self, operator):
  56. """Register a function as rewriter for a given type.
  57. The function should receive three arguments, which are
  58. the :class:`.MigrationContext`, a ``revision`` tuple, and
  59. an op directive of the type indicated. E.g.::
  60. @writer1.rewrites(ops.AddColumnOp)
  61. def add_column_nullable(context, revision, op):
  62. op.column.nullable = True
  63. return op
  64. """
  65. return self.dispatch.dispatch_for(operator)
  66. def _rewrite(self, context, revision, directive):
  67. try:
  68. _rewriter = self.dispatch.dispatch(directive)
  69. except ValueError:
  70. _rewriter = None
  71. yield directive
  72. else:
  73. for r_directive in util.to_list(
  74. _rewriter(context, revision, directive)):
  75. yield r_directive
  76. def __call__(self, context, revision, directives):
  77. self.process_revision_directives(context, revision, directives)
  78. if self._chained:
  79. self._chained(context, revision, directives)
  80. @_traverse.dispatch_for(ops.MigrationScript)
  81. def _traverse_script(self, context, revision, directive):
  82. upgrade_ops_list = []
  83. for upgrade_ops in directive.upgrade_ops_list:
  84. ret = self._traverse_for(context, revision, directive.upgrade_ops)
  85. if len(ret) != 1:
  86. raise ValueError(
  87. "Can only return single object for UpgradeOps traverse")
  88. upgrade_ops_list.append(ret[0])
  89. directive.upgrade_ops = upgrade_ops_list
  90. downgrade_ops_list = []
  91. for downgrade_ops in directive.downgrade_ops_list:
  92. ret = self._traverse_for(
  93. context, revision, directive.downgrade_ops)
  94. if len(ret) != 1:
  95. raise ValueError(
  96. "Can only return single object for DowngradeOps traverse")
  97. downgrade_ops_list.append(ret[0])
  98. directive.downgrade_ops = downgrade_ops_list
  99. @_traverse.dispatch_for(ops.OpContainer)
  100. def _traverse_op_container(self, context, revision, directive):
  101. self._traverse_list(context, revision, directive.ops)
  102. @_traverse.dispatch_for(ops.MigrateOperation)
  103. def _traverse_any_directive(self, context, revision, directive):
  104. pass
  105. def _traverse_for(self, context, revision, directive):
  106. directives = list(self._rewrite(context, revision, directive))
  107. for directive in directives:
  108. traverser = self._traverse.dispatch(directive)
  109. traverser(self, context, revision, directive)
  110. return directives
  111. def _traverse_list(self, context, revision, directives):
  112. dest = []
  113. for directive in directives:
  114. dest.extend(self._traverse_for(context, revision, directive))
  115. directives[:] = dest
  116. def process_revision_directives(self, context, revision, directives):
  117. self._traverse_list(context, revision, directives)