test_select.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. from .. import fixtures, config
  2. from ..assertions import eq_
  3. from sqlalchemy import util
  4. from sqlalchemy import Integer, String, select, func, bindparam, union
  5. from sqlalchemy import testing
  6. from ..schema import Table, Column
  7. class OrderByLabelTest(fixtures.TablesTest):
  8. """Test the dialect sends appropriate ORDER BY expressions when
  9. labels are used.
  10. This essentially exercises the "supports_simple_order_by_label"
  11. setting.
  12. """
  13. __backend__ = True
  14. @classmethod
  15. def define_tables(cls, metadata):
  16. Table("some_table", metadata,
  17. Column('id', Integer, primary_key=True),
  18. Column('x', Integer),
  19. Column('y', Integer),
  20. Column('q', String(50)),
  21. Column('p', String(50))
  22. )
  23. @classmethod
  24. def insert_data(cls):
  25. config.db.execute(
  26. cls.tables.some_table.insert(),
  27. [
  28. {"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
  29. {"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
  30. {"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
  31. ]
  32. )
  33. def _assert_result(self, select, result):
  34. eq_(
  35. config.db.execute(select).fetchall(),
  36. result
  37. )
  38. def test_plain(self):
  39. table = self.tables.some_table
  40. lx = table.c.x.label('lx')
  41. self._assert_result(
  42. select([lx]).order_by(lx),
  43. [(1, ), (2, ), (3, )]
  44. )
  45. def test_composed_int(self):
  46. table = self.tables.some_table
  47. lx = (table.c.x + table.c.y).label('lx')
  48. self._assert_result(
  49. select([lx]).order_by(lx),
  50. [(3, ), (5, ), (7, )]
  51. )
  52. def test_composed_multiple(self):
  53. table = self.tables.some_table
  54. lx = (table.c.x + table.c.y).label('lx')
  55. ly = (func.lower(table.c.q) + table.c.p).label('ly')
  56. self._assert_result(
  57. select([lx, ly]).order_by(lx, ly.desc()),
  58. [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
  59. )
  60. def test_plain_desc(self):
  61. table = self.tables.some_table
  62. lx = table.c.x.label('lx')
  63. self._assert_result(
  64. select([lx]).order_by(lx.desc()),
  65. [(3, ), (2, ), (1, )]
  66. )
  67. def test_composed_int_desc(self):
  68. table = self.tables.some_table
  69. lx = (table.c.x + table.c.y).label('lx')
  70. self._assert_result(
  71. select([lx]).order_by(lx.desc()),
  72. [(7, ), (5, ), (3, )]
  73. )
  74. def test_group_by_composed(self):
  75. table = self.tables.some_table
  76. expr = (table.c.x + table.c.y).label('lx')
  77. stmt = select([func.count(table.c.id), expr]).group_by(expr).order_by(expr)
  78. self._assert_result(
  79. stmt,
  80. [(1, 3), (1, 5), (1, 7)]
  81. )
  82. class LimitOffsetTest(fixtures.TablesTest):
  83. __backend__ = True
  84. @classmethod
  85. def define_tables(cls, metadata):
  86. Table("some_table", metadata,
  87. Column('id', Integer, primary_key=True),
  88. Column('x', Integer),
  89. Column('y', Integer))
  90. @classmethod
  91. def insert_data(cls):
  92. config.db.execute(
  93. cls.tables.some_table.insert(),
  94. [
  95. {"id": 1, "x": 1, "y": 2},
  96. {"id": 2, "x": 2, "y": 3},
  97. {"id": 3, "x": 3, "y": 4},
  98. {"id": 4, "x": 4, "y": 5},
  99. ]
  100. )
  101. def _assert_result(self, select, result, params=()):
  102. eq_(
  103. config.db.execute(select, params).fetchall(),
  104. result
  105. )
  106. def test_simple_limit(self):
  107. table = self.tables.some_table
  108. self._assert_result(
  109. select([table]).order_by(table.c.id).limit(2),
  110. [(1, 1, 2), (2, 2, 3)]
  111. )
  112. @testing.requires.offset
  113. def test_simple_offset(self):
  114. table = self.tables.some_table
  115. self._assert_result(
  116. select([table]).order_by(table.c.id).offset(2),
  117. [(3, 3, 4), (4, 4, 5)]
  118. )
  119. @testing.requires.offset
  120. def test_simple_limit_offset(self):
  121. table = self.tables.some_table
  122. self._assert_result(
  123. select([table]).order_by(table.c.id).limit(2).offset(1),
  124. [(2, 2, 3), (3, 3, 4)]
  125. )
  126. @testing.requires.offset
  127. def test_limit_offset_nobinds(self):
  128. """test that 'literal binds' mode works - no bound params."""
  129. table = self.tables.some_table
  130. stmt = select([table]).order_by(table.c.id).limit(2).offset(1)
  131. sql = stmt.compile(
  132. dialect=config.db.dialect,
  133. compile_kwargs={"literal_binds": True})
  134. sql = str(sql)
  135. self._assert_result(
  136. sql,
  137. [(2, 2, 3), (3, 3, 4)]
  138. )
  139. @testing.requires.bound_limit_offset
  140. def test_bound_limit(self):
  141. table = self.tables.some_table
  142. self._assert_result(
  143. select([table]).order_by(table.c.id).limit(bindparam('l')),
  144. [(1, 1, 2), (2, 2, 3)],
  145. params={"l": 2}
  146. )
  147. @testing.requires.bound_limit_offset
  148. def test_bound_offset(self):
  149. table = self.tables.some_table
  150. self._assert_result(
  151. select([table]).order_by(table.c.id).offset(bindparam('o')),
  152. [(3, 3, 4), (4, 4, 5)],
  153. params={"o": 2}
  154. )
  155. @testing.requires.bound_limit_offset
  156. def test_bound_limit_offset(self):
  157. table = self.tables.some_table
  158. self._assert_result(
  159. select([table]).order_by(table.c.id).
  160. limit(bindparam("l")).offset(bindparam("o")),
  161. [(2, 2, 3), (3, 3, 4)],
  162. params={"l": 2, "o": 1}
  163. )
  164. class CompoundSelectTest(fixtures.TablesTest):
  165. __backend__ = True
  166. @classmethod
  167. def define_tables(cls, metadata):
  168. Table("some_table", metadata,
  169. Column('id', Integer, primary_key=True),
  170. Column('x', Integer),
  171. Column('y', Integer))
  172. @classmethod
  173. def insert_data(cls):
  174. config.db.execute(
  175. cls.tables.some_table.insert(),
  176. [
  177. {"id": 1, "x": 1, "y": 2},
  178. {"id": 2, "x": 2, "y": 3},
  179. {"id": 3, "x": 3, "y": 4},
  180. {"id": 4, "x": 4, "y": 5},
  181. ]
  182. )
  183. def _assert_result(self, select, result, params=()):
  184. eq_(
  185. config.db.execute(select, params).fetchall(),
  186. result
  187. )
  188. def test_plain_union(self):
  189. table = self.tables.some_table
  190. s1 = select([table]).where(table.c.id == 2)
  191. s2 = select([table]).where(table.c.id == 3)
  192. u1 = union(s1, s2)
  193. self._assert_result(
  194. u1.order_by(u1.c.id),
  195. [(2, 2, 3), (3, 3, 4)]
  196. )
  197. def test_select_from_plain_union(self):
  198. table = self.tables.some_table
  199. s1 = select([table]).where(table.c.id == 2)
  200. s2 = select([table]).where(table.c.id == 3)
  201. u1 = union(s1, s2).alias().select()
  202. self._assert_result(
  203. u1.order_by(u1.c.id),
  204. [(2, 2, 3), (3, 3, 4)]
  205. )
  206. @testing.requires.parens_in_union_contained_select_w_limit_offset
  207. def test_limit_offset_selectable_in_unions(self):
  208. table = self.tables.some_table
  209. s1 = select([table]).where(table.c.id == 2).\
  210. limit(1).order_by(table.c.id)
  211. s2 = select([table]).where(table.c.id == 3).\
  212. limit(1).order_by(table.c.id)
  213. u1 = union(s1, s2).limit(2)
  214. self._assert_result(
  215. u1.order_by(u1.c.id),
  216. [(2, 2, 3), (3, 3, 4)]
  217. )
  218. @testing.requires.parens_in_union_contained_select_wo_limit_offset
  219. def test_order_by_selectable_in_unions(self):
  220. table = self.tables.some_table
  221. s1 = select([table]).where(table.c.id == 2).\
  222. order_by(table.c.id)
  223. s2 = select([table]).where(table.c.id == 3).\
  224. order_by(table.c.id)
  225. u1 = union(s1, s2).limit(2)
  226. self._assert_result(
  227. u1.order_by(u1.c.id),
  228. [(2, 2, 3), (3, 3, 4)]
  229. )
  230. def test_distinct_selectable_in_unions(self):
  231. table = self.tables.some_table
  232. s1 = select([table]).where(table.c.id == 2).\
  233. distinct()
  234. s2 = select([table]).where(table.c.id == 3).\
  235. distinct()
  236. u1 = union(s1, s2).limit(2)
  237. self._assert_result(
  238. u1.order_by(u1.c.id),
  239. [(2, 2, 3), (3, 3, 4)]
  240. )
  241. @testing.requires.parens_in_union_contained_select_w_limit_offset
  242. def test_limit_offset_in_unions_from_alias(self):
  243. table = self.tables.some_table
  244. s1 = select([table]).where(table.c.id == 2).\
  245. limit(1).order_by(table.c.id)
  246. s2 = select([table]).where(table.c.id == 3).\
  247. limit(1).order_by(table.c.id)
  248. # this necessarily has double parens
  249. u1 = union(s1, s2).alias()
  250. self._assert_result(
  251. u1.select().limit(2).order_by(u1.c.id),
  252. [(2, 2, 3), (3, 3, 4)]
  253. )
  254. def test_limit_offset_aliased_selectable_in_unions(self):
  255. table = self.tables.some_table
  256. s1 = select([table]).where(table.c.id == 2).\
  257. limit(1).order_by(table.c.id).alias().select()
  258. s2 = select([table]).where(table.c.id == 3).\
  259. limit(1).order_by(table.c.id).alias().select()
  260. u1 = union(s1, s2).limit(2)
  261. self._assert_result(
  262. u1.order_by(u1.c.id),
  263. [(2, 2, 3), (3, 3, 4)]
  264. )