ajax.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from sqlalchemy import or_, and_
  2. from flask_admin._compat import as_unicode, string_types
  3. from flask_admin.model.ajax import AjaxModelLoader, DEFAULT_PAGE_SIZE
  4. from .tools import get_primary_key, has_multiple_pks, is_relationship, is_association_proxy
  5. class QueryAjaxModelLoader(AjaxModelLoader):
  6. def __init__(self, name, session, model, **options):
  7. """
  8. Constructor.
  9. :param fields:
  10. Fields to run query against
  11. :param filters:
  12. Additional filters to apply to the loader
  13. """
  14. super(QueryAjaxModelLoader, self).__init__(name, options)
  15. self.session = session
  16. self.model = model
  17. self.fields = options.get('fields')
  18. self.order_by = options.get('order_by')
  19. self.filters = options.get('filters')
  20. if not self.fields:
  21. raise ValueError('AJAX loading requires `fields` to be specified for %s.%s' % (model, self.name))
  22. self._cached_fields = self._process_fields()
  23. if has_multiple_pks(model):
  24. raise NotImplementedError('Flask-Admin does not support multi-pk AJAX model loading.')
  25. self.pk = get_primary_key(model)
  26. def _process_fields(self):
  27. remote_fields = []
  28. for field in self.fields:
  29. if isinstance(field, string_types):
  30. attr = getattr(self.model, field, None)
  31. if not attr:
  32. raise ValueError('%s.%s does not exist.' % (self.model, field))
  33. remote_fields.append(attr)
  34. else:
  35. # TODO: Figure out if it is valid SQLAlchemy property?
  36. remote_fields.append(field)
  37. return remote_fields
  38. def format(self, model):
  39. if not model:
  40. return None
  41. return (getattr(model, self.pk), as_unicode(model))
  42. def get_one(self, pk):
  43. # prevent autoflush from occuring during populate_obj
  44. with self.session.no_autoflush:
  45. return self.session.query(self.model).get(pk)
  46. def get_list(self, term, offset=0, limit=DEFAULT_PAGE_SIZE):
  47. query = self.session.query(self.model)
  48. filters = (field.ilike(u'%%%s%%' % term) for field in self._cached_fields)
  49. query = query.filter(or_(*filters))
  50. if self.filters:
  51. filters = ["%s.%s" % (self.model.__name__.lower(), value) for value in self.filters]
  52. query = query.filter(and_(*filters))
  53. if self.order_by:
  54. query = query.order_by(self.order_by)
  55. return query.offset(offset).limit(limit).all()
  56. def create_ajax_loader(model, session, name, field_name, options):
  57. attr = getattr(model, field_name, None)
  58. if attr is None:
  59. raise ValueError('Model %s does not have field %s.' % (model, field_name))
  60. if not is_relationship(attr) and not is_association_proxy(attr):
  61. raise ValueError('%s.%s is not a relation.' % (model, field_name))
  62. if is_association_proxy(attr):
  63. attr = attr.remote_attr
  64. remote_model = attr.prop.mapper.class_
  65. return QueryAjaxModelLoader(name, session, remote_model, **options)