123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- from .. import fixtures, config
- from ..config import requirements
- from .. import exclusions
- from ..assertions import eq_
- from .. import engines
- from ... import testing
- from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func
- import datetime
- from ..schema import Table, Column
- class RowFetchTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table('plain_pk', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
- Table('has_dates', metadata,
- Column('id', Integer, primary_key=True),
- Column('today', DateTime)
- )
- @classmethod
- def insert_data(cls):
- config.db.execute(
- cls.tables.plain_pk.insert(),
- [
- {"id": 1, "data": "d1"},
- {"id": 2, "data": "d2"},
- {"id": 3, "data": "d3"},
- ]
- )
- config.db.execute(
- cls.tables.has_dates.insert(),
- [
- {"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}
- ]
- )
- def test_via_string(self):
- row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
- ).first()
- eq_(
- row['id'], 1
- )
- eq_(
- row['data'], "d1"
- )
- def test_via_int(self):
- row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
- ).first()
- eq_(
- row[0], 1
- )
- eq_(
- row[1], "d1"
- )
- def test_via_col_object(self):
- row = config.db.execute(
- self.tables.plain_pk.select().
- order_by(self.tables.plain_pk.c.id)
- ).first()
- eq_(
- row[self.tables.plain_pk.c.id], 1
- )
- eq_(
- row[self.tables.plain_pk.c.data], "d1"
- )
- @requirements.duplicate_names_in_cursor_description
- def test_row_with_dupe_names(self):
- result = config.db.execute(
- select([self.tables.plain_pk.c.data,
- self.tables.plain_pk.c.data.label('data')]).
- order_by(self.tables.plain_pk.c.id)
- )
- row = result.first()
- eq_(result.keys(), ['data', 'data'])
- eq_(row, ('d1', 'd1'))
- def test_row_w_scalar_select(self):
- """test that a scalar select as a column is returned as such
- and that type conversion works OK.
- (this is half a SQLAlchemy Core test and half to catch database
- backends that may have unusual behavior with scalar selects.)
- """
- datetable = self.tables.has_dates
- s = select([datetable.alias('x').c.today]).as_scalar()
- s2 = select([datetable.c.id, s.label('somelabel')])
- row = config.db.execute(s2).first()
- eq_(row['somelabel'], datetime.datetime(2006, 5, 12, 12, 0, 0))
- class PercentSchemaNamesTest(fixtures.TablesTest):
- """tests using percent signs, spaces in table and column names.
- This is a very fringe use case, doesn't work for MySQL
- or PostgreSQL. the requirement, "percent_schema_names",
- is marked "skip" by default.
- """
- __requires__ = ('percent_schema_names', )
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- cls.tables.percent_table = Table('percent%table', metadata,
- Column("percent%", Integer),
- Column(
- "spaces % more spaces", Integer),
- )
- cls.tables.lightweight_percent_table = sql.table(
- 'percent%table', sql.column("percent%"),
- sql.column("spaces % more spaces")
- )
- def test_single_roundtrip(self):
- percent_table = self.tables.percent_table
- for params in [
- {'percent%': 5, 'spaces % more spaces': 12},
- {'percent%': 7, 'spaces % more spaces': 11},
- {'percent%': 9, 'spaces % more spaces': 10},
- {'percent%': 11, 'spaces % more spaces': 9}
- ]:
- config.db.execute(percent_table.insert(), params)
- self._assert_table()
- def test_executemany_roundtrip(self):
- percent_table = self.tables.percent_table
- config.db.execute(
- percent_table.insert(),
- {'percent%': 5, 'spaces % more spaces': 12}
- )
- config.db.execute(
- percent_table.insert(),
- [{'percent%': 7, 'spaces % more spaces': 11},
- {'percent%': 9, 'spaces % more spaces': 10},
- {'percent%': 11, 'spaces % more spaces': 9}]
- )
- self._assert_table()
- def _assert_table(self):
- percent_table = self.tables.percent_table
- lightweight_percent_table = self.tables.lightweight_percent_table
- for table in (
- percent_table,
- percent_table.alias(),
- lightweight_percent_table,
- lightweight_percent_table.alias()):
- eq_(
- list(
- config.db.execute(
- table.select().order_by(table.c['percent%'])
- )
- ),
- [
- (5, 12),
- (7, 11),
- (9, 10),
- (11, 9)
- ]
- )
- eq_(
- list(
- config.db.execute(
- table.select().
- where(table.c['spaces % more spaces'].in_([9, 10])).
- order_by(table.c['percent%']),
- )
- ),
- [
- (9, 10),
- (11, 9)
- ]
- )
- row = config.db.execute(table.select().
- order_by(table.c['percent%'])).first()
- eq_(row['percent%'], 5)
- eq_(row['spaces % more spaces'], 12)
- eq_(row[table.c['percent%']], 5)
- eq_(row[table.c['spaces % more spaces']], 12)
- config.db.execute(
- percent_table.update().values(
- {percent_table.c['spaces % more spaces']: 15}
- )
- )
- eq_(
- list(
- config.db.execute(
- percent_table.
- select().
- order_by(percent_table.c['percent%'])
- )
- ),
- [(5, 15), (7, 15), (9, 15), (11, 15)]
- )
- class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
- __requires__ = ('server_side_cursors', )
- __backend__ = True
- def _is_server_side(self, cursor):
- if self.engine.url.drivername == 'postgresql':
- return cursor.name
- elif self.engine.url.drivername == 'mysql':
- sscursor = __import__('MySQLdb.cursors').cursors.SSCursor
- return isinstance(cursor, sscursor)
- elif self.engine.url.drivername == 'mysql+pymysql':
- sscursor = __import__('pymysql.cursors').cursors.SSCursor
- return isinstance(cursor, sscursor)
- else:
- return False
- def _fixture(self, server_side_cursors):
- self.engine = engines.testing_engine(
- options={'server_side_cursors': server_side_cursors}
- )
- return self.engine
- def tearDown(self):
- engines.testing_reaper.close_all()
- self.engine.dispose()
- def test_global_string(self):
- engine = self._fixture(True)
- result = engine.execute('select 1')
- assert self._is_server_side(result.cursor)
- def test_global_text(self):
- engine = self._fixture(True)
- result = engine.execute(text('select 1'))
- assert self._is_server_side(result.cursor)
- def test_global_expr(self):
- engine = self._fixture(True)
- result = engine.execute(select([1]))
- assert self._is_server_side(result.cursor)
- def test_global_off_explicit(self):
- engine = self._fixture(False)
- result = engine.execute(text('select 1'))
- # It should be off globally ...
- assert not self._is_server_side(result.cursor)
- def test_stmt_option(self):
- engine = self._fixture(False)
- s = select([1]).execution_options(stream_results=True)
- result = engine.execute(s)
- # ... but enabled for this one.
- assert self._is_server_side(result.cursor)
- def test_conn_option(self):
- engine = self._fixture(False)
- # and this one
- result = \
- engine.connect().execution_options(stream_results=True).\
- execute('select 1'
- )
- assert self._is_server_side(result.cursor)
- def test_stmt_enabled_conn_option_disabled(self):
- engine = self._fixture(False)
- s = select([1]).execution_options(stream_results=True)
- # not this one
- result = \
- engine.connect().execution_options(stream_results=False).\
- execute(s)
- assert not self._is_server_side(result.cursor)
- def test_stmt_option_disabled(self):
- engine = self._fixture(True)
- s = select([1]).execution_options(stream_results=False)
- result = engine.execute(s)
- assert not self._is_server_side(result.cursor)
- def test_aliases_and_ss(self):
- engine = self._fixture(False)
- s1 = select([1]).execution_options(stream_results=True).alias()
- result = engine.execute(s1)
- assert self._is_server_side(result.cursor)
- # s1's options shouldn't affect s2 when s2 is used as a
- # from_obj.
- s2 = select([1], from_obj=s1)
- result = engine.execute(s2)
- assert not self._is_server_side(result.cursor)
- def test_for_update_expr(self):
- engine = self._fixture(True)
- s1 = select([1], for_update=True)
- result = engine.execute(s1)
- assert self._is_server_side(result.cursor)
- def test_for_update_string(self):
- engine = self._fixture(True)
- result = engine.execute('SELECT 1 FOR UPDATE')
- assert self._is_server_side(result.cursor)
- def test_text_no_ss(self):
- engine = self._fixture(False)
- s = text('select 42')
- result = engine.execute(s)
- assert not self._is_server_side(result.cursor)
- def test_text_ss_option(self):
- engine = self._fixture(False)
- s = text('select 42').execution_options(stream_results=True)
- result = engine.execute(s)
- assert self._is_server_side(result.cursor)
- @testing.provide_metadata
- def test_roundtrip(self):
- md = self.metadata
- engine = self._fixture(True)
- test_table = Table('test_table', md,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
- test_table.create(checkfirst=True)
- test_table.insert().execute(data='data1')
- test_table.insert().execute(data='data2')
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'
- ), (2, 'data2')])
- test_table.update().where(
- test_table.c.id == 2).values(
- data=test_table.c.data +
- ' updated').execute()
- eq_(test_table.select().execute().fetchall(),
- [(1, 'data1'), (2, 'data2 updated')])
- test_table.delete().execute()
- eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)
|