123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- import logging
- from flask import flash
- from flask_admin._compat import string_types, iteritems
- from flask_admin.babel import gettext, ngettext, lazy_gettext
- from flask_admin.model import BaseModelView
- from flask_admin.model.form import create_editable_list_form
- from peewee import PrimaryKeyField, ForeignKeyField, Field, CharField, TextField
- from flask_admin.actions import action
- from flask_admin.contrib.peewee import filters
- from .form import get_form, CustomModelConverter, InlineModelConverter, save_inline
- from .tools import get_primary_key, parse_like_term
- from .ajax import create_ajax_loader
- # Set up logger
- log = logging.getLogger("flask-admin.peewee")
- class ModelView(BaseModelView):
- column_filters = None
- """
- Collection of the column filters.
- Can contain either field names or instances of
- :class:`flask_admin.contrib.peewee.filters.BasePeeweeFilter` classes.
- Filters will be grouped by name when displayed in the drop-down.
- For example::
- class MyModelView(BaseModelView):
- column_filters = ('user', 'email')
- or::
- from flask_admin.contrib.peewee.filters import BooleanEqualFilter
- class MyModelView(BaseModelView):
- column_filters = (BooleanEqualFilter(column=User.name, name='Name'),)
- or::
- from flask_admin.contrib.peewee.filters import BasePeeweeFilter
- class FilterLastNameBrown(BasePeeweeFilter):
- def apply(self, query, value):
- if value == '1':
- return query.filter(self.column == "Brown")
- else:
- return query.filter(self.column != "Brown")
- def operation(self):
- return 'is Brown'
- class MyModelView(BaseModelView):
- column_filters = [
- FilterLastNameBrown(
- column=User.last_name, name='Last Name',
- options=(('1', 'Yes'), ('0', 'No'))
- )
- ]
- """
- model_form_converter = CustomModelConverter
- """
- Model form conversion class. Use this to implement custom field conversion logic.
- For example::
- class MyModelConverter(AdminModelConverter):
- pass
- class MyAdminView(ModelView):
- model_form_converter = MyModelConverter
- """
- inline_model_form_converter = InlineModelConverter
- """
- Inline model conversion class. If you need some kind of post-processing for inline
- forms, you can customize behavior by doing something like this::
- class MyInlineModelConverter(AdminModelConverter):
- def post_process(self, form_class, info):
- form_class.value = TextField('value')
- return form_class
- class MyAdminView(ModelView):
- inline_model_form_converter = MyInlineModelConverter
- """
- filter_converter = filters.FilterConverter()
- """
- Field to filter converter.
- Override this attribute to use non-default converter.
- """
- fast_mass_delete = False
- """
- If set to `False` and user deletes more than one model using actions,
- all models will be read from the database and then deleted one by one
- giving Peewee chance to manually cleanup any dependencies (many-to-many
- relationships, etc).
- If set to True, will run DELETE statement which is somewhat faster, but
- might leave corrupted data if you forget to configure DELETE CASCADE
- for your model.
- """
- inline_models = None
- """
- Inline related-model editing for models with parent to child relation.
- Accept enumerable with one of the values:
- 1. Child model class::
- class MyModelView(ModelView):
- inline_models = (Post,)
- 2. Child model class and additional options::
- class MyModelView(ModelView):
- inline_models = [(Post, dict(form_columns=['title']))]
- 3. Django-like ``InlineFormAdmin`` class instance::
- from flask_admin.model.form import InlineFormAdmin
- class MyInlineModelForm(InlineFormAdmin):
- form_columns = ('title', 'date')
- class MyModelView(ModelView):
- inline_models = (MyInlineModelForm(MyInlineModel),)
- You can customize generated field name by:
- 1. Using `form_name` property as option:
- class MyModelView(ModelView):
- inline_models = ((Post, dict(form_label='Hello')))
- 2. Using field's related_name:
- class Model1(Base):
- # ...
- pass
- class Model2(Base):
- # ...
- model1 = ForeignKeyField(related_name="model_twos")
- class MyModel1View(Base):
- inline_models = (Model2,)
- column_labels = {'model_ones': 'Hello'}
- """
- def __init__(self, model, name=None,
- category=None, endpoint=None, url=None, static_folder=None,
- menu_class_name=None, menu_icon_type=None, menu_icon_value=None):
- self._search_fields = []
- super(ModelView, self).__init__(model, name, category, endpoint, url, static_folder,
- menu_class_name=menu_class_name,
- menu_icon_type=menu_icon_type,
- menu_icon_value=menu_icon_value)
- self._primary_key = self.scaffold_pk()
- def _get_model_fields(self, model=None):
- if model is None:
- model = self.model
- return iteritems(model._meta.fields)
- def scaffold_pk(self):
- return get_primary_key(self.model)
- def get_pk_value(self, model):
- return getattr(model, self._primary_key)
- def scaffold_list_columns(self):
- columns = []
- for n, f in self._get_model_fields():
- # Verify type
- field_class = type(f)
- if field_class == ForeignKeyField:
- columns.append(n)
- elif self.column_display_pk or field_class != PrimaryKeyField:
- columns.append(n)
- return columns
- def scaffold_sortable_columns(self):
- columns = dict()
- for n, f in self._get_model_fields():
- if self.column_display_pk or type(f) != PrimaryKeyField:
- columns[n] = f
- return columns
- def init_search(self):
- if self.column_searchable_list:
- for p in self.column_searchable_list:
- if isinstance(p, string_types):
- p = getattr(self.model, p)
- field_type = type(p)
- # Check type
- if (field_type != CharField and field_type != TextField):
- raise Exception('Can only search on text columns. ' +
- 'Failed to setup search for "%s"' % p)
- self._search_fields.append(p)
- return bool(self._search_fields)
- def scaffold_filters(self, name):
- if isinstance(name, string_types):
- attr = getattr(self.model, name, None)
- else:
- attr = name
- if attr is None:
- raise Exception('Failed to find field for filter: %s' % name)
- # Check if field is in different model
- if attr.model_class != self.model:
- visible_name = '%s / %s' % (self.get_column_name(attr.model_class.__name__),
- self.get_column_name(attr.name))
- else:
- if not isinstance(name, string_types):
- visible_name = self.get_column_name(attr.name)
- else:
- visible_name = self.get_column_name(name)
- type_name = type(attr).__name__
- flt = self.filter_converter.convert(type_name,
- attr,
- visible_name)
- return flt
- def is_valid_filter(self, filter):
- return isinstance(filter, filters.BasePeeweeFilter)
- def scaffold_form(self):
- form_class = get_form(self.model, self.model_form_converter(self),
- base_class=self.form_base_class,
- only=self.form_columns,
- exclude=self.form_excluded_columns,
- field_args=self.form_args,
- # Allow child to specify pk, so inline_models
- # can be ModelViews. But don't auto-generate
- # pk field if form_columns is empty -- allow
- # default behaviour in that case.
- allow_pk=bool(self.form_columns),
- extra_fields=self.form_extra_fields)
- if self.inline_models:
- form_class = self.scaffold_inline_form_models(form_class)
- return form_class
- def scaffold_list_form(self, widget=None, validators=None):
- """
- Create form for the `index_view` using only the columns from
- `self.column_editable_list`.
- :param widget:
- WTForms widget class. Defaults to `XEditableWidget`.
- :param validators:
- `form_args` dict with only validators
- {'name': {'validators': [required()]}}
- """
- form_class = get_form(self.model, self.model_form_converter(self),
- base_class=self.form_base_class,
- only=self.column_editable_list,
- field_args=validators)
- return create_editable_list_form(self.form_base_class, form_class,
- widget)
- def scaffold_inline_form_models(self, form_class):
- converter = self.model_form_converter(self)
- inline_converter = self.inline_model_form_converter(self)
- for m in self.inline_models:
- form_class = inline_converter.contribute(converter,
- self.model,
- form_class,
- m)
- return form_class
- # AJAX foreignkey support
- def _create_ajax_loader(self, name, options):
- return create_ajax_loader(self.model, name, name, options)
- def _handle_join(self, query, field, joins):
- if field.model_class != self.model:
- model_name = field.model_class.__name__
- if model_name not in joins:
- query = query.join(field.model_class)
- joins.add(model_name)
- return query
- def _order_by(self, query, joins, sort_field, sort_desc):
- if isinstance(sort_field, string_types):
- field = getattr(self.model, sort_field)
- query = query.order_by(field.desc() if sort_desc else field.asc())
- elif isinstance(sort_field, Field):
- if sort_field.model_class != self.model:
- query = self._handle_join(query, sort_field, joins)
- query = query.order_by(sort_field.desc() if sort_desc else sort_field.asc())
- return query, joins
- def get_query(self):
- return self.model.select()
- def get_list(self, page, sort_column, sort_desc, search, filters,
- execute=True, page_size=None):
- """
- Return records from the database.
- :param page:
- Page number
- :param sort_column:
- Sort column name
- :param sort_desc:
- Descending or ascending sort
- :param search:
- Search query
- :param filters:
- List of filter tuples
- :param execute:
- Execute query immediately? Default is `True`
- :param page_size:
- Number of results. Defaults to ModelView's page_size. Can be
- overriden to change the page_size limit. Removing the page_size
- limit requires setting page_size to 0 or False.
- """
- query = self.get_query()
- joins = set()
- # Search
- if self._search_supported and search:
- values = search.split(' ')
- for value in values:
- if not value:
- continue
- term = parse_like_term(value)
- stmt = None
- for field in self._search_fields:
- query = self._handle_join(query, field, joins)
- q = field ** term
- if stmt is None:
- stmt = q
- else:
- stmt |= q
- query = query.where(stmt)
- # Filters
- if self._filters:
- for flt, flt_name, value in filters:
- f = self._filters[flt]
- query = self._handle_join(query, f.column, joins)
- query = f.apply(query, f.clean(value))
- # Get count
- count = query.count() if not self.simple_list_pager else None
- # Apply sorting
- if sort_column is not None:
- sort_field = self._sortable_columns[sort_column]
- query, joins = self._order_by(query, joins, sort_field, sort_desc)
- else:
- order = self._get_default_order()
- if order:
- query, joins = self._order_by(query, joins, order[0], order[1])
- # Pagination
- if page_size is None:
- page_size = self.page_size
- if page_size:
- query = query.limit(page_size)
- if page and page_size:
- query = query.offset(page * page_size)
- if execute:
- query = list(query.execute())
- return count, query
- def get_one(self, id):
- return self.model.get(**{self._primary_key: id})
- def create_model(self, form):
- try:
- model = self.model()
- form.populate_obj(model)
- self._on_model_change(form, model, True)
- model.save()
- # For peewee have to save inline forms after model was saved
- save_inline(form, model)
- except Exception as ex:
- if not self.handle_view_exception(ex):
- flash(gettext('Failed to create record. %(error)s', error=str(ex)), 'error')
- log.exception('Failed to create record.')
- return False
- else:
- self.after_model_change(form, model, True)
- return model
- def update_model(self, form, model):
- try:
- form.populate_obj(model)
- self._on_model_change(form, model, False)
- model.save()
- # For peewee have to save inline forms after model was saved
- save_inline(form, model)
- except Exception as ex:
- if not self.handle_view_exception(ex):
- flash(gettext('Failed to update record. %(error)s', error=str(ex)), 'error')
- log.exception('Failed to update record.')
- return False
- else:
- self.after_model_change(form, model, False)
- return True
- def delete_model(self, model):
- try:
- self.on_model_delete(model)
- model.delete_instance(recursive=True)
- except Exception as ex:
- if not self.handle_view_exception(ex):
- flash(gettext('Failed to delete record. %(error)s', error=str(ex)), 'error')
- log.exception('Failed to delete record.')
- return False
- else:
- self.after_model_delete(model)
- return True
- # Default model actions
- def is_action_allowed(self, name):
- # Check delete action permission
- if name == 'delete' and not self.can_delete:
- return False
- return super(ModelView, self).is_action_allowed(name)
- @action('delete',
- lazy_gettext('Delete'),
- lazy_gettext('Are you sure you want to delete selected records?'))
- def action_delete(self, ids):
- try:
- model_pk = getattr(self.model, self._primary_key)
- if self.fast_mass_delete:
- count = self.model.delete().where(model_pk << ids).execute()
- else:
- count = 0
- query = self.model.select().filter(model_pk << ids)
- for m in query:
- self.on_model_delete(m)
- m.delete_instance(recursive=True)
- count += 1
- flash(ngettext('Record was successfully deleted.',
- '%(count)s records were successfully deleted.',
- count,
- count=count), 'success')
- except Exception as ex:
- if not self.handle_view_exception(ex):
- flash(gettext('Failed to delete records. %(error)s', error=str(ex)), 'error')
|