123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750 |
- from sqlalchemy import schema as sa_schema, types as sqltypes, sql
- from ..operations import ops
- from ..util import compat
- import re
- from ..util.compat import string_types
- from .. import util
- from mako.pygen import PythonPrinter
- from ..util.compat import StringIO
- MAX_PYTHON_ARGS = 255
- try:
- from sqlalchemy.sql.naming import conv
- def _render_gen_name(autogen_context, name):
- if isinstance(name, conv):
- return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
- else:
- return name
- except ImportError:
- def _render_gen_name(autogen_context, name):
- return name
- def _indent(text):
- text = re.compile(r'^', re.M).sub(" ", text).strip()
- text = re.compile(r' +$', re.M).sub("", text)
- return text
- def _render_python_into_templatevars(
- autogen_context, migration_script, template_args):
- imports = autogen_context.imports
- for upgrade_ops, downgrade_ops in zip(
- migration_script.upgrade_ops_list,
- migration_script.downgrade_ops_list):
- template_args[upgrade_ops.upgrade_token] = _indent(
- _render_cmd_body(upgrade_ops, autogen_context))
- template_args[downgrade_ops.downgrade_token] = _indent(
- _render_cmd_body(downgrade_ops, autogen_context))
- template_args['imports'] = "\n".join(sorted(imports))
- default_renderers = renderers = util.Dispatcher()
- def _render_cmd_body(op_container, autogen_context):
- buf = StringIO()
- printer = PythonPrinter(buf)
- printer.writeline(
- "# ### commands auto generated by Alembic - please adjust! ###"
- )
- if not op_container.ops:
- printer.writeline("pass")
- else:
- for op in op_container.ops:
- lines = render_op(autogen_context, op)
- for line in lines:
- printer.writeline(line)
- printer.writeline("# ### end Alembic commands ###")
- return buf.getvalue()
- def render_op(autogen_context, op):
- renderer = renderers.dispatch(op)
- lines = util.to_list(renderer(autogen_context, op))
- return lines
- def render_op_text(autogen_context, op):
- return "\n".join(render_op(autogen_context, op))
- @renderers.dispatch_for(ops.ModifyTableOps)
- def _render_modify_table(autogen_context, op):
- opts = autogen_context.opts
- render_as_batch = opts.get('render_as_batch', False)
- if op.ops:
- lines = []
- if render_as_batch:
- with autogen_context._within_batch():
- lines.append(
- "with op.batch_alter_table(%r, schema=%r) as batch_op:"
- % (op.table_name, op.schema)
- )
- for t_op in op.ops:
- t_lines = render_op(autogen_context, t_op)
- lines.extend(t_lines)
- lines.append("")
- else:
- for t_op in op.ops:
- t_lines = render_op(autogen_context, t_op)
- lines.extend(t_lines)
- return lines
- else:
- return [
- "pass"
- ]
- @renderers.dispatch_for(ops.CreateTableOp)
- def _add_table(autogen_context, op):
- table = op.to_table()
- args = [col for col in
- [_render_column(col, autogen_context) for col in table.columns]
- if col] + \
- sorted([rcons for rcons in
- [_render_constraint(cons, autogen_context) for cons in
- table.constraints]
- if rcons is not None
- ])
- if len(args) > MAX_PYTHON_ARGS:
- args = '*[' + ',\n'.join(args) + ']'
- else:
- args = ',\n'.join(args)
- text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
- 'tablename': _ident(op.table_name),
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'args': args,
- }
- if op.schema:
- text += ",\nschema=%r" % _ident(op.schema)
- for k in sorted(op.kw):
- text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
- text += "\n)"
- return text
- @renderers.dispatch_for(ops.DropTableOp)
- def _drop_table(autogen_context, op):
- text = "%(prefix)sdrop_table(%(tname)r" % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": _ident(op.table_name)
- }
- if op.schema:
- text += ", schema=%r" % _ident(op.schema)
- text += ")"
- return text
- @renderers.dispatch_for(ops.CreateIndexOp)
- def _add_index(autogen_context, op):
- index = op.to_index()
- has_batch = autogen_context._has_batch
- if has_batch:
- tmpl = "%(prefix)screate_index(%(name)r, [%(columns)s], "\
- "unique=%(unique)r%(kwargs)s)"
- else:
- tmpl = "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "\
- "unique=%(unique)r%(schema)s%(kwargs)s)"
- text = tmpl % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, index.name),
- 'table': _ident(index.table.name),
- 'columns': ", ".join(
- _get_index_rendered_expressions(index, autogen_context)),
- 'unique': index.unique or False,
- 'schema': (", schema=%r" % _ident(index.table.schema))
- if index.table.schema else '',
- 'kwargs': (
- ', ' +
- ', '.join(
- ["%s=%s" %
- (key, _render_potential_expr(val, autogen_context))
- for key, val in index.kwargs.items()]))
- if len(index.kwargs) else ''
- }
- return text
- @renderers.dispatch_for(ops.DropIndexOp)
- def _drop_index(autogen_context, op):
- has_batch = autogen_context._has_batch
- if has_batch:
- tmpl = "%(prefix)sdrop_index(%(name)r)"
- else:
- tmpl = "%(prefix)sdrop_index(%(name)r, "\
- "table_name=%(table_name)r%(schema)s)"
- text = tmpl % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, op.index_name),
- 'table_name': _ident(op.table_name),
- 'schema': ((", schema=%r" % _ident(op.schema))
- if op.schema else '')
- }
- return text
- @renderers.dispatch_for(ops.CreateUniqueConstraintOp)
- def _add_unique_constraint(autogen_context, op):
- return [_uq_constraint(op.to_constraint(), autogen_context, True)]
- @renderers.dispatch_for(ops.CreateForeignKeyOp)
- def _add_fk_constraint(autogen_context, op):
- args = [
- repr(
- _render_gen_name(autogen_context, op.constraint_name)),
- ]
- if not autogen_context._has_batch:
- args.append(
- repr(_ident(op.source_table))
- )
- args.extend(
- [
- repr(_ident(op.referent_table)),
- repr([_ident(col) for col in op.local_cols]),
- repr([_ident(col) for col in op.remote_cols])
- ]
- )
- kwargs = [
- 'referent_schema',
- 'onupdate', 'ondelete', 'initially',
- 'deferrable', 'use_alter'
- ]
- if not autogen_context._has_batch:
- kwargs.insert(0, 'source_schema')
- for k in kwargs:
- if k in op.kw:
- value = op.kw[k]
- if value is not None:
- args.append("%s=%r" % (k, value))
- return "%(prefix)screate_foreign_key(%(args)s)" % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'args': ", ".join(args)
- }
- @renderers.dispatch_for(ops.CreatePrimaryKeyOp)
- def _add_pk_constraint(constraint, autogen_context):
- raise NotImplementedError()
- @renderers.dispatch_for(ops.CreateCheckConstraintOp)
- def _add_check_constraint(constraint, autogen_context):
- raise NotImplementedError()
- @renderers.dispatch_for(ops.DropConstraintOp)
- def _drop_constraint(autogen_context, op):
- if autogen_context._has_batch:
- template = "%(prefix)sdrop_constraint"\
- "(%(name)r, type_=%(type)r)"
- else:
- template = "%(prefix)sdrop_constraint"\
- "(%(name)r, '%(table_name)s'%(schema)s, type_=%(type)r)"
- text = template % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(
- autogen_context, op.constraint_name),
- 'table_name': _ident(op.table_name),
- 'type': op.constraint_type,
- 'schema': (", schema='%s'" % _ident(op.schema))
- if op.schema else '',
- }
- return text
- @renderers.dispatch_for(ops.AddColumnOp)
- def _add_column(autogen_context, op):
- schema, tname, column = op.schema, op.table_name, op.column
- if autogen_context._has_batch:
- template = "%(prefix)sadd_column(%(column)s)"
- else:
- template = "%(prefix)sadd_column(%(tname)r, %(column)s"
- if schema:
- template += ", schema=%(schema)r"
- template += ")"
- text = template % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": tname,
- "column": _render_column(column, autogen_context),
- "schema": schema
- }
- return text
- @renderers.dispatch_for(ops.DropColumnOp)
- def _drop_column(autogen_context, op):
- schema, tname, column_name = op.schema, op.table_name, op.column_name
- if autogen_context._has_batch:
- template = "%(prefix)sdrop_column(%(cname)r)"
- else:
- template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
- if schema:
- template += ", schema=%(schema)r"
- template += ")"
- text = template % {
- "prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": _ident(tname),
- "cname": _ident(column_name),
- "schema": _ident(schema)
- }
- return text
- @renderers.dispatch_for(ops.AlterColumnOp)
- def _alter_column(autogen_context, op):
- tname = op.table_name
- cname = op.column_name
- server_default = op.modify_server_default
- type_ = op.modify_type
- nullable = op.modify_nullable
- autoincrement = op.kw.get('autoincrement', None)
- existing_type = op.existing_type
- existing_nullable = op.existing_nullable
- existing_server_default = op.existing_server_default
- schema = op.schema
- indent = " " * 11
- if autogen_context._has_batch:
- template = "%(prefix)salter_column(%(cname)r"
- else:
- template = "%(prefix)salter_column(%(tname)r, %(cname)r"
- text = template % {
- 'prefix': _alembic_autogenerate_prefix(
- autogen_context),
- 'tname': tname,
- 'cname': cname}
- if existing_type is not None:
- text += ",\n%sexisting_type=%s" % (
- indent,
- _repr_type(existing_type, autogen_context))
- if server_default is not False:
- rendered = _render_server_default(
- server_default, autogen_context)
- text += ",\n%sserver_default=%s" % (indent, rendered)
- if type_ is not None:
- text += ",\n%stype_=%s" % (indent,
- _repr_type(type_, autogen_context))
- if nullable is not None:
- text += ",\n%snullable=%r" % (
- indent, nullable,)
- if nullable is None and existing_nullable is not None:
- text += ",\n%sexisting_nullable=%r" % (
- indent, existing_nullable)
- if autoincrement is not None:
- text += ",\n%sautoincrement=%r" % (
- indent, autoincrement)
- if server_default is False and existing_server_default:
- rendered = _render_server_default(
- existing_server_default,
- autogen_context)
- text += ",\n%sexisting_server_default=%s" % (
- indent, rendered)
- if schema and not autogen_context._has_batch:
- text += ",\n%sschema=%r" % (indent, schema)
- text += ")"
- return text
- class _f_name(object):
- def __init__(self, prefix, name):
- self.prefix = prefix
- self.name = name
- def __repr__(self):
- return "%sf(%r)" % (self.prefix, _ident(self.name))
- def _ident(name):
- """produce a __repr__() object for a string identifier that may
- use quoted_name() in SQLAlchemy 0.9 and greater.
- The issue worked around here is that quoted_name() doesn't have
- very good repr() behavior by itself when unicode is involved.
- """
- if name is None:
- return name
- elif util.sqla_09 and isinstance(name, sql.elements.quoted_name):
- if compat.py2k:
- # the attempt to encode to ascii here isn't super ideal,
- # however we are trying to cut down on an explosion of
- # u'' literals only when py2k + SQLA 0.9, in particular
- # makes unit tests testing code generation very difficult
- try:
- return name.encode('ascii')
- except UnicodeError:
- return compat.text_type(name)
- else:
- return compat.text_type(name)
- elif isinstance(name, compat.string_types):
- return name
- def _render_potential_expr(value, autogen_context, wrap_in_text=True):
- if isinstance(value, sql.ClauseElement):
- if util.sqla_08:
- compile_kw = dict(compile_kwargs={
- 'literal_binds': True, "include_table": False})
- else:
- compile_kw = {}
- if wrap_in_text:
- template = "%(prefix)stext(%(sql)r)"
- else:
- template = "%(sql)r"
- return template % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "sql": compat.text_type(
- value.compile(dialect=autogen_context.dialect,
- **compile_kw)
- )
- }
- else:
- return repr(value)
- def _get_index_rendered_expressions(idx, autogen_context):
- if util.sqla_08:
- return [repr(_ident(getattr(exp, "name", None)))
- if isinstance(exp, sa_schema.Column)
- else _render_potential_expr(exp, autogen_context)
- for exp in idx.expressions]
- else:
- return [
- repr(_ident(getattr(col, "name", None))) for col in idx.columns]
- def _uq_constraint(constraint, autogen_context, alter):
- opts = []
- has_batch = autogen_context._has_batch
- if constraint.deferrable:
- opts.append(("deferrable", str(constraint.deferrable)))
- if constraint.initially:
- opts.append(("initially", str(constraint.initially)))
- if not has_batch and alter and constraint.table.schema:
- opts.append(("schema", _ident(constraint.table.schema)))
- if not alter and constraint.name:
- opts.append(
- ("name",
- _render_gen_name(autogen_context, constraint.name)))
- if alter:
- args = [
- repr(_render_gen_name(
- autogen_context, constraint.name))]
- if not has_batch:
- args += [repr(_ident(constraint.table.name))]
- args.append(repr([_ident(col.name) for col in constraint.columns]))
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- return "%(prefix)screate_unique_constraint(%(args)s)" % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'args': ", ".join(args)
- }
- else:
- args = [repr(_ident(col.name)) for col in constraint.columns]
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- return "%(prefix)sUniqueConstraint(%(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "args": ", ".join(args)
- }
- def _user_autogenerate_prefix(autogen_context, target):
- prefix = autogen_context.opts['user_module_prefix']
- if prefix is None:
- return "%s." % target.__module__
- else:
- return prefix
- def _sqlalchemy_autogenerate_prefix(autogen_context):
- return autogen_context.opts['sqlalchemy_module_prefix'] or ''
- def _alembic_autogenerate_prefix(autogen_context):
- if autogen_context._has_batch:
- return 'batch_op.'
- else:
- return autogen_context.opts['alembic_module_prefix'] or ''
- def _user_defined_render(type_, object_, autogen_context):
- if 'render_item' in autogen_context.opts:
- render = autogen_context.opts['render_item']
- if render:
- rendered = render(type_, object_, autogen_context)
- if rendered is not False:
- return rendered
- return False
- def _render_column(column, autogen_context):
- rendered = _user_defined_render("column", column, autogen_context)
- if rendered is not False:
- return rendered
- opts = []
- if column.server_default:
- rendered = _render_server_default(
- column.server_default, autogen_context
- )
- if rendered:
- opts.append(("server_default", rendered))
- if not column.autoincrement:
- opts.append(("autoincrement", column.autoincrement))
- if column.nullable is not None:
- opts.append(("nullable", column.nullable))
- # TODO: for non-ascii colname, assign a "key"
- return "%(prefix)sColumn(%(name)r, %(type)s, %(kw)s)" % {
- 'prefix': _sqlalchemy_autogenerate_prefix(autogen_context),
- 'name': _ident(column.name),
- 'type': _repr_type(column.type, autogen_context),
- 'kw': ", ".join(["%s=%s" % (kwname, val) for kwname, val in opts])
- }
- def _render_server_default(default, autogen_context, repr_=True):
- rendered = _user_defined_render("server_default", default, autogen_context)
- if rendered is not False:
- return rendered
- if isinstance(default, sa_schema.DefaultClause):
- if isinstance(default.arg, compat.string_types):
- default = default.arg
- else:
- return _render_potential_expr(default.arg, autogen_context)
- if isinstance(default, string_types) and repr_:
- default = repr(re.sub(r"^'|'$", "", default))
- return default
- def _repr_type(type_, autogen_context):
- rendered = _user_defined_render("type", type_, autogen_context)
- if rendered is not False:
- return rendered
- if hasattr(autogen_context.migration_context, 'impl'):
- impl_rt = autogen_context.migration_context.impl.render_type(
- type_, autogen_context)
- mod = type(type_).__module__
- imports = autogen_context.imports
- if mod.startswith("sqlalchemy.dialects"):
- dname = re.match(r"sqlalchemy\.dialects\.(\w+)", mod).group(1)
- if imports is not None:
- imports.add("from sqlalchemy.dialects import %s" % dname)
- if impl_rt:
- return impl_rt
- else:
- return "%s.%r" % (dname, type_)
- elif mod.startswith("sqlalchemy."):
- prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
- return "%s%r" % (prefix, type_)
- else:
- prefix = _user_autogenerate_prefix(autogen_context, type_)
- return "%s%r" % (prefix, type_)
- _constraint_renderers = util.Dispatcher()
- def _render_constraint(constraint, autogen_context):
- try:
- renderer = _constraint_renderers.dispatch(constraint)
- except ValueError:
- util.warn("No renderer is established for object %r" % constraint)
- return "[Unknown Python object %r]" % constraint
- else:
- return renderer(constraint, autogen_context)
- @_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
- def _render_primary_key(constraint, autogen_context):
- rendered = _user_defined_render("primary_key", constraint, autogen_context)
- if rendered is not False:
- return rendered
- if not constraint.columns:
- return None
- opts = []
- if constraint.name:
- opts.append(("name", repr(
- _render_gen_name(autogen_context, constraint.name))))
- return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "args": ", ".join(
- [repr(c.name) for c in constraint.columns] +
- ["%s=%s" % (kwname, val) for kwname, val in opts]
- ),
- }
- def _fk_colspec(fk, metadata_schema):
- """Implement a 'safe' version of ForeignKey._get_colspec() that
- never tries to resolve the remote table.
- """
- colspec = fk._get_colspec()
- tokens = colspec.split(".")
- tname, colname = tokens[-2:]
- if metadata_schema is not None and len(tokens) == 2:
- table_fullname = "%s.%s" % (metadata_schema, tname)
- else:
- table_fullname = ".".join(tokens[0:-1])
- if fk.parent is not None and fk.parent.table is not None:
- # try to resolve the remote table and adjust for column.key
- parent_metadata = fk.parent.table.metadata
- if table_fullname in parent_metadata.tables:
- colname = _ident(
- parent_metadata.tables[table_fullname].c[colname].name)
- colspec = "%s.%s" % (table_fullname, colname)
- return colspec
- def _populate_render_fk_opts(constraint, opts):
- if constraint.onupdate:
- opts.append(("onupdate", repr(constraint.onupdate)))
- if constraint.ondelete:
- opts.append(("ondelete", repr(constraint.ondelete)))
- if constraint.initially:
- opts.append(("initially", repr(constraint.initially)))
- if constraint.deferrable:
- opts.append(("deferrable", repr(constraint.deferrable)))
- if constraint.use_alter:
- opts.append(("use_alter", repr(constraint.use_alter)))
- @_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
- def _render_foreign_key(constraint, autogen_context):
- rendered = _user_defined_render("foreign_key", constraint, autogen_context)
- if rendered is not False:
- return rendered
- opts = []
- if constraint.name:
- opts.append(("name", repr(
- _render_gen_name(autogen_context, constraint.name))))
- _populate_render_fk_opts(constraint, opts)
- apply_metadata_schema = constraint.parent.metadata.schema
- return "%(prefix)sForeignKeyConstraint([%(cols)s], "\
- "[%(refcols)s], %(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "cols": ", ".join(
- "%r" % _ident(f.parent.name) for f in constraint.elements),
- "refcols": ", ".join(repr(_fk_colspec(f, apply_metadata_schema))
- for f in constraint.elements),
- "args": ", ".join(
- ["%s=%s" % (kwname, val) for kwname, val in opts]
- ),
- }
- @_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
- def _render_unique_constraint(constraint, autogen_context):
- rendered = _user_defined_render("unique", constraint, autogen_context)
- if rendered is not False:
- return rendered
- return _uq_constraint(constraint, autogen_context, False)
- @_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
- def _render_check_constraint(constraint, autogen_context):
- rendered = _user_defined_render("check", constraint, autogen_context)
- if rendered is not False:
- return rendered
- # detect the constraint being part of
- # a parent type which is probably in the Table already.
- # ideally SQLAlchemy would give us more of a first class
- # way to detect this.
- if constraint._create_rule and \
- hasattr(constraint._create_rule, 'target') and \
- isinstance(constraint._create_rule.target,
- sqltypes.TypeEngine):
- return None
- opts = []
- if constraint.name:
- opts.append(
- (
- "name",
- repr(
- _render_gen_name(
- autogen_context, constraint.name))
- )
- )
- return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "opts": ", " + (", ".join("%s=%s" % (k, v)
- for k, v in opts)) if opts else "",
- "sqltext": _render_potential_expr(
- constraint.sqltext, autogen_context, wrap_in_text=False)
- }
- @renderers.dispatch_for(ops.ExecuteSQLOp)
- def _execute_sql(autogen_context, op):
- if not isinstance(op.sqltext, string_types):
- raise NotImplementedError(
- "Autogenerate rendering of SQL Expression language constructs "
- "not supported here; please use a plain SQL string"
- )
- return 'op.execute(%r)' % op.sqltext
- renderers = default_renderers.branch()
|