loading.py 25 KB


  1. # orm/loading.py
  2. # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  7. """private module containing functions used to convert database
  8. rows into object instances and associated state.
  9. the functions here are called primarily by Query, Mapper,
  10. as well as some of the attribute loading strategies.
  11. """
  12. from __future__ import absolute_import
  13. from .. import util
  14. from . import attributes, exc as orm_exc
  15. from ..sql import util as sql_util
  16. from . import strategy_options
  17. from .util import _none_set, state_str
  18. from .base import _SET_DEFERRED_EXPIRED, _DEFER_FOR_STATE
  19. from .. import exc as sa_exc
  20. import collections
  21. _new_runid = util.counter()
  22. def instances(query, cursor, context):
  23. """Return an ORM result as an iterator."""
  24. context.runid = _new_runid()
  25. filtered = query._has_mapper_entities
  26. single_entity = len(query._entities) == 1 and \
  27. query._entities[0].supports_single_entity
  28. if filtered:
  29. if single_entity:
  30. filter_fn = id
  31. else:
  32. def filter_fn(row):
  33. return tuple(
  34. id(item)
  35. if ent.use_id_for_hash
  36. else item
  37. for ent, item in zip(query._entities, row)
  38. )
  39. try:
  40. (process, labels) = \
  41. list(zip(*[
  42. query_entity.row_processor(query,
  43. context, cursor)
  44. for query_entity in query._entities
  45. ]))
  46. if not single_entity:
  47. keyed_tuple = util.lightweight_named_tuple('result', labels)
  48. while True:
  49. context.partials = {}
  50. if query._yield_per:
  51. fetch = cursor.fetchmany(query._yield_per)
  52. if not fetch:
  53. break
  54. else:
  55. fetch = cursor.fetchall()
  56. if single_entity:
  57. proc = process[0]
  58. rows = [proc(row) for row in fetch]
  59. else:
  60. rows = [keyed_tuple([proc(row) for proc in process])
  61. for row in fetch]
  62. if filtered:
  63. rows = util.unique_list(rows, filter_fn)
  64. for row in rows:
  65. yield row
  66. if not query._yield_per:
  67. break
  68. except Exception as err:
  69. cursor.close()
  70. util.raise_from_cause(err)
  71. @util.dependencies("sqlalchemy.orm.query")
  72. def merge_result(querylib, query, iterator, load=True):
  73. """Merge a result into this :class:`.Query` object's Session."""
  74. session = query.session
  75. if load:
  76. # flush current contents if we expect to load data
  77. session._autoflush()
  78. autoflush = session.autoflush
  79. try:
  80. session.autoflush = False
  81. single_entity = len(query._entities) == 1
  82. if single_entity:
  83. if isinstance(query._entities[0], querylib._MapperEntity):
  84. result = [session._merge(
  85. attributes.instance_state(instance),
  86. attributes.instance_dict(instance),
  87. load=load, _recursive={}, _resolve_conflict_map={})
  88. for instance in iterator]
  89. else:
  90. result = list(iterator)
  91. else:
  92. mapped_entities = [i for i, e in enumerate(query._entities)
  93. if isinstance(e, querylib._MapperEntity)]
  94. result = []
  95. keys = [ent._label_name for ent in query._entities]
  96. keyed_tuple = util.lightweight_named_tuple('result', keys)
  97. for row in iterator:
  98. newrow = list(row)
  99. for i in mapped_entities:
  100. if newrow[i] is not None:
  101. newrow[i] = session._merge(
  102. attributes.instance_state(newrow[i]),
  103. attributes.instance_dict(newrow[i]),
  104. load=load, _recursive={}, _resolve_conflict_map={})
  105. result.append(keyed_tuple(newrow))
  106. return iter(result)
  107. finally:
  108. session.autoflush = autoflush
  109. def get_from_identity(session, key, passive):
  110. """Look up the given key in the given session's identity map,
  111. check the object for expired state if found.
  112. """
  113. instance = session.identity_map.get(key)
  114. if instance is not None:
  115. state = attributes.instance_state(instance)
  116. # expired - ensure it still exists
  117. if state.expired:
  118. if not passive & attributes.SQL_OK:
  119. # TODO: no coverage here
  120. return attributes.PASSIVE_NO_RESULT
  121. elif not passive & attributes.RELATED_OBJECT_OK:
  122. # this mode is used within a flush and the instance's
  123. # expired state will be checked soon enough, if necessary
  124. return instance
  125. try:
  126. state._load_expired(state, passive)
  127. except orm_exc.ObjectDeletedError:
  128. session._remove_newly_deleted([state])
  129. return None
  130. return instance
  131. else:
  132. return None
  133. def load_on_ident(query, key,
  134. refresh_state=None, lockmode=None,
  135. only_load_props=None):
  136. """Load the given identity key from the database."""
  137. if key is not None:
  138. ident = key[1]
  139. else:
  140. ident = None
  141. if refresh_state is None:
  142. q = query._clone()
  143. q._get_condition()
  144. else:
  145. q = query._clone()
  146. if ident is not None:
  147. mapper = query._mapper_zero()
  148. (_get_clause, _get_params) = mapper._get_clause
  149. # None present in ident - turn those comparisons
  150. # into "IS NULL"
  151. if None in ident:
  152. nones = set([
  153. _get_params[col].key for col, value in
  154. zip(mapper.primary_key, ident) if value is None
  155. ])
  156. _get_clause = sql_util.adapt_criterion_to_null(
  157. _get_clause, nones)
  158. _get_clause = q._adapt_clause(_get_clause, True, False)
  159. q._criterion = _get_clause
  160. params = dict([
  161. (_get_params[primary_key].key, id_val)
  162. for id_val, primary_key in zip(ident, mapper.primary_key)
  163. ])
  164. q._params = params
  165. if lockmode is not None:
  166. version_check = True
  167. q = q.with_lockmode(lockmode)
  168. elif query._for_update_arg is not None:
  169. version_check = True
  170. q._for_update_arg = query._for_update_arg
  171. else:
  172. version_check = False
  173. q._get_options(
  174. populate_existing=bool(refresh_state),
  175. version_check=version_check,
  176. only_load_props=only_load_props,
  177. refresh_state=refresh_state)
  178. q._order_by = None
  179. try:
  180. return q.one()
  181. except orm_exc.NoResultFound:
  182. return None
  183. def _setup_entity_query(
  184. context, mapper, query_entity,
  185. path, adapter, column_collection,
  186. with_polymorphic=None, only_load_props=None,
  187. polymorphic_discriminator=None, **kw):
  188. if with_polymorphic:
  189. poly_properties = mapper._iterate_polymorphic_properties(
  190. with_polymorphic)
  191. else:
  192. poly_properties = mapper._polymorphic_properties
  193. quick_populators = {}
  194. path.set(
  195. context.attributes,
  196. "memoized_setups",
  197. quick_populators)
  198. for value in poly_properties:
  199. if only_load_props and \
  200. value.key not in only_load_props:
  201. continue
  202. value.setup(
  203. context,
  204. query_entity,
  205. path,
  206. adapter,
  207. only_load_props=only_load_props,
  208. column_collection=column_collection,
  209. memoized_populators=quick_populators,
  210. **kw
  211. )
  212. if polymorphic_discriminator is not None and \
  213. polymorphic_discriminator \
  214. is not mapper.polymorphic_on:
  215. if adapter:
  216. pd = adapter.columns[polymorphic_discriminator]
  217. else:
  218. pd = polymorphic_discriminator
  219. column_collection.append(pd)
  220. def _instance_processor(
  221. mapper, context, result, path, adapter,
  222. only_load_props=None, refresh_state=None,
  223. polymorphic_discriminator=None,
  224. _polymorphic_from=None):
  225. """Produce a mapper level row processor callable
  226. which processes rows into mapped instances."""
  227. # note that this method, most of which exists in a closure
  228. # called _instance(), resists being broken out, as
  229. # attempts to do so tend to add significant function
  230. # call overhead. _instance() is the most
  231. # performance-critical section in the whole ORM.
  232. pk_cols = mapper.primary_key
  233. if adapter:
  234. pk_cols = [adapter.columns[c] for c in pk_cols]
  235. identity_class = mapper._identity_class
  236. populators = collections.defaultdict(list)
  237. props = mapper._prop_set
  238. if only_load_props is not None:
  239. props = props.intersection(
  240. mapper._props[k] for k in only_load_props)
  241. quick_populators = path.get(
  242. context.attributes, "memoized_setups", _none_set)
  243. for prop in props:
  244. if prop in quick_populators:
  245. # this is an inlined path just for column-based attributes.
  246. col = quick_populators[prop]
  247. if col is _DEFER_FOR_STATE:
  248. populators["new"].append(
  249. (prop.key, prop._deferred_column_loader))
  250. elif col is _SET_DEFERRED_EXPIRED:
  251. # note that in this path, we are no longer
  252. # searching in the result to see if the column might
  253. # be present in some unexpected way.
  254. populators["expire"].append((prop.key, False))
  255. else:
  256. if adapter:
  257. col = adapter.columns[col]
  258. getter = result._getter(col, False)
  259. if getter:
  260. populators["quick"].append((prop.key, getter))
  261. else:
  262. # fall back to the ColumnProperty itself, which
  263. # will iterate through all of its columns
  264. # to see if one fits
  265. prop.create_row_processor(
  266. context, path, mapper, result, adapter, populators)
  267. else:
  268. prop.create_row_processor(
  269. context, path, mapper, result, adapter, populators)
  270. propagate_options = context.propagate_options
  271. load_path = context.query._current_path + path \
  272. if context.query._current_path.path else path
  273. session_identity_map = context.session.identity_map
  274. populate_existing = context.populate_existing or mapper.always_refresh
  275. load_evt = bool(mapper.class_manager.dispatch.load)
  276. refresh_evt = bool(mapper.class_manager.dispatch.refresh)
  277. persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
  278. if persistent_evt:
  279. loaded_as_persistent = context.session.dispatch.loaded_as_persistent
  280. instance_state = attributes.instance_state
  281. instance_dict = attributes.instance_dict
  282. session_id = context.session.hash_key
  283. version_check = context.version_check
  284. runid = context.runid
  285. if refresh_state:
  286. refresh_identity_key = refresh_state.key
  287. if refresh_identity_key is None:
  288. # super-rare condition; a refresh is being called
  289. # on a non-instance-key instance; this is meant to only
  290. # occur within a flush()
  291. refresh_identity_key = \
  292. mapper._identity_key_from_state(refresh_state)
  293. else:
  294. refresh_identity_key = None
  295. if mapper.allow_partial_pks:
  296. is_not_primary_key = _none_set.issuperset
  297. else:
  298. is_not_primary_key = _none_set.intersection
  299. def _instance(row):
  300. # determine the state that we'll be populating
  301. if refresh_identity_key:
  302. # fixed state that we're refreshing
  303. state = refresh_state
  304. instance = state.obj()
  305. dict_ = instance_dict(instance)
  306. isnew = state.runid != runid
  307. currentload = True
  308. loaded_instance = False
  309. else:
  310. # look at the row, see if that identity is in the
  311. # session, or we have to create a new one
  312. identitykey = (
  313. identity_class,
  314. tuple([row[column] for column in pk_cols])
  315. )
  316. instance = session_identity_map.get(identitykey)
  317. if instance is not None:
  318. # existing instance
  319. state = instance_state(instance)
  320. dict_ = instance_dict(instance)
  321. isnew = state.runid != runid
  322. currentload = not isnew
  323. loaded_instance = False
  324. if version_check and not currentload:
  325. _validate_version_id(mapper, state, dict_, row, adapter)
  326. else:
  327. # create a new instance
  328. # check for non-NULL values in the primary key columns,
  329. # else no entity is returned for the row
  330. if is_not_primary_key(identitykey[1]):
  331. return None
  332. isnew = True
  333. currentload = True
  334. loaded_instance = True
  335. instance = mapper.class_manager.new_instance()
  336. dict_ = instance_dict(instance)
  337. state = instance_state(instance)
  338. state.key = identitykey
  339. # attach instance to session.
  340. state.session_id = session_id
  341. session_identity_map._add_unpresent(state, identitykey)
  342. # populate. this looks at whether this state is new
  343. # for this load or was existing, and whether or not this
  344. # row is the first row with this identity.
  345. if currentload or populate_existing:
  346. # full population routines. Objects here are either
  347. # just created, or we are doing a populate_existing
  348. # be conservative about setting load_path when populate_existing
  349. # is in effect; want to maintain options from the original
  350. # load. see test_expire->test_refresh_maintains_deferred_options
  351. if isnew and (propagate_options or not populate_existing):
  352. state.load_options = propagate_options
  353. state.load_path = load_path
  354. _populate_full(
  355. context, row, state, dict_, isnew, load_path,
  356. loaded_instance, populate_existing, populators)
  357. if isnew:
  358. if loaded_instance:
  359. if load_evt:
  360. state.manager.dispatch.load(state, context)
  361. if persistent_evt:
  362. loaded_as_persistent(context.session, state.obj())
  363. elif refresh_evt:
  364. state.manager.dispatch.refresh(
  365. state, context, only_load_props)
  366. if populate_existing or state.modified:
  367. if refresh_state and only_load_props:
  368. state._commit(dict_, only_load_props)
  369. else:
  370. state._commit_all(dict_, session_identity_map)
  371. else:
  372. # partial population routines, for objects that were already
  373. # in the Session, but a row matches them; apply eager loaders
  374. # on existing objects, etc.
  375. unloaded = state.unloaded
  376. isnew = state not in context.partials
  377. if not isnew or unloaded or populators["eager"]:
  378. # state is having a partial set of its attributes
  379. # refreshed. Populate those attributes,
  380. # and add to the "context.partials" collection.
  381. to_load = _populate_partial(
  382. context, row, state, dict_, isnew, load_path,
  383. unloaded, populators)
  384. if isnew:
  385. if refresh_evt:
  386. state.manager.dispatch.refresh(
  387. state, context, to_load)
  388. state._commit(dict_, to_load)
  389. return instance
  390. if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
  391. # if we are doing polymorphic, dispatch to a different _instance()
  392. # method specific to the subclass mapper
  393. _instance = _decorate_polymorphic_switch(
  394. _instance, context, mapper, result, path,
  395. polymorphic_discriminator, adapter)
  396. return _instance
  397. def _populate_full(
  398. context, row, state, dict_, isnew, load_path,
  399. loaded_instance, populate_existing, populators):
  400. if isnew:
  401. # first time we are seeing a row with this identity.
  402. state.runid = context.runid
  403. for key, getter in populators["quick"]:
  404. dict_[key] = getter(row)
  405. if populate_existing:
  406. for key, set_callable in populators["expire"]:
  407. dict_.pop(key, None)
  408. if set_callable:
  409. state.expired_attributes.add(key)
  410. else:
  411. for key, set_callable in populators["expire"]:
  412. if set_callable:
  413. state.expired_attributes.add(key)
  414. for key, populator in populators["new"]:
  415. populator(state, dict_, row)
  416. for key, populator in populators["delayed"]:
  417. populator(state, dict_, row)
  418. elif load_path != state.load_path:
  419. # new load path, e.g. object is present in more than one
  420. # column position in a series of rows
  421. state.load_path = load_path
  422. # if we have data, and the data isn't in the dict, OK, let's put
  423. # it in.
  424. for key, getter in populators["quick"]:
  425. if key not in dict_:
  426. dict_[key] = getter(row)
  427. # otherwise treat like an "already seen" row
  428. for key, populator in populators["existing"]:
  429. populator(state, dict_, row)
  430. # TODO: allow "existing" populator to know this is
  431. # a new path for the state:
  432. # populator(state, dict_, row, new_path=True)
  433. else:
  434. # have already seen rows with this identity in this same path.
  435. for key, populator in populators["existing"]:
  436. populator(state, dict_, row)
  437. # TODO: same path
  438. # populator(state, dict_, row, new_path=False)
  439. def _populate_partial(
  440. context, row, state, dict_, isnew, load_path,
  441. unloaded, populators):
  442. if not isnew:
  443. to_load = context.partials[state]
  444. for key, populator in populators["existing"]:
  445. if key in to_load:
  446. populator(state, dict_, row)
  447. else:
  448. to_load = unloaded
  449. context.partials[state] = to_load
  450. for key, getter in populators["quick"]:
  451. if key in to_load:
  452. dict_[key] = getter(row)
  453. for key, set_callable in populators["expire"]:
  454. if key in to_load:
  455. dict_.pop(key, None)
  456. if set_callable:
  457. state.expired_attributes.add(key)
  458. for key, populator in populators["new"]:
  459. if key in to_load:
  460. populator(state, dict_, row)
  461. for key, populator in populators["delayed"]:
  462. if key in to_load:
  463. populator(state, dict_, row)
  464. for key, populator in populators["eager"]:
  465. if key not in unloaded:
  466. populator(state, dict_, row)
  467. return to_load
  468. def _validate_version_id(mapper, state, dict_, row, adapter):
  469. version_id_col = mapper.version_id_col
  470. if version_id_col is None:
  471. return
  472. if adapter:
  473. version_id_col = adapter.columns[version_id_col]
  474. if mapper._get_state_attr_by_column(
  475. state, dict_, mapper.version_id_col) != row[version_id_col]:
  476. raise orm_exc.StaleDataError(
  477. "Instance '%s' has version id '%s' which "
  478. "does not match database-loaded version id '%s'."
  479. % (state_str(state), mapper._get_state_attr_by_column(
  480. state, dict_, mapper.version_id_col),
  481. row[version_id_col]))
  482. def _decorate_polymorphic_switch(
  483. instance_fn, context, mapper, result, path,
  484. polymorphic_discriminator, adapter):
  485. if polymorphic_discriminator is not None:
  486. polymorphic_on = polymorphic_discriminator
  487. else:
  488. polymorphic_on = mapper.polymorphic_on
  489. if polymorphic_on is None:
  490. return instance_fn
  491. if adapter:
  492. polymorphic_on = adapter.columns[polymorphic_on]
  493. def configure_subclass_mapper(discriminator):
  494. try:
  495. sub_mapper = mapper.polymorphic_map[discriminator]
  496. except KeyError:
  497. raise AssertionError(
  498. "No such polymorphic_identity %r is defined" %
  499. discriminator)
  500. else:
  501. if sub_mapper is mapper:
  502. return None
  503. return _instance_processor(
  504. sub_mapper, context, result,
  505. path, adapter, _polymorphic_from=mapper)
  506. polymorphic_instances = util.PopulateDict(
  507. configure_subclass_mapper
  508. )
  509. def polymorphic_instance(row):
  510. discriminator = row[polymorphic_on]
  511. if discriminator is not None:
  512. _instance = polymorphic_instances[discriminator]
  513. if _instance:
  514. return _instance(row)
  515. return instance_fn(row)
  516. return polymorphic_instance
  517. def load_scalar_attributes(mapper, state, attribute_names):
  518. """initiate a column-based attribute refresh operation."""
  519. # assert mapper is _state_mapper(state)
  520. session = state.session
  521. if not session:
  522. raise orm_exc.DetachedInstanceError(
  523. "Instance %s is not bound to a Session; "
  524. "attribute refresh operation cannot proceed" %
  525. (state_str(state)))
  526. has_key = bool(state.key)
  527. result = False
  528. if mapper.inherits and not mapper.concrete:
  529. # because we are using Core to produce a select() that we
  530. # pass to the Query, we aren't calling setup() for mapped
  531. # attributes; in 1.0 this means deferred attrs won't get loaded
  532. # by default
  533. statement = mapper._optimized_get_statement(state, attribute_names)
  534. if statement is not None:
  535. result = load_on_ident(
  536. session.query(mapper).
  537. options(
  538. strategy_options.Load(mapper).undefer("*")
  539. ).from_statement(statement),
  540. None,
  541. only_load_props=attribute_names,
  542. refresh_state=state
  543. )
  544. if result is False:
  545. if has_key:
  546. identity_key = state.key
  547. else:
  548. # this codepath is rare - only valid when inside a flush, and the
  549. # object is becoming persistent but hasn't yet been assigned
  550. # an identity_key.
  551. # check here to ensure we have the attrs we need.
  552. pk_attrs = [mapper._columntoproperty[col].key
  553. for col in mapper.primary_key]
  554. if state.expired_attributes.intersection(pk_attrs):
  555. raise sa_exc.InvalidRequestError(
  556. "Instance %s cannot be refreshed - it's not "
  557. " persistent and does not "
  558. "contain a full primary key." % state_str(state))
  559. identity_key = mapper._identity_key_from_state(state)
  560. if (_none_set.issubset(identity_key) and
  561. not mapper.allow_partial_pks) or \
  562. _none_set.issuperset(identity_key):
  563. util.warn_limited(
  564. "Instance %s to be refreshed doesn't "
  565. "contain a full primary key - can't be refreshed "
  566. "(and shouldn't be expired, either).",
  567. state_str(state))
  568. return
  569. result = load_on_ident(
  570. session.query(mapper),
  571. identity_key,
  572. refresh_state=state,
  573. only_load_props=attribute_names)
  574. # if instance is pending, a refresh operation
  575. # may not complete (even if PK attributes are assigned)
  576. if has_key and result is None:
  577. raise orm_exc.ObjectDeletedError(state)