-
Notifications
You must be signed in to change notification settings - Fork 261
/
Copy pathtest_async_iter.py
65 lines (48 loc) · 1.79 KB
/
test_async_iter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import pytest
from aiomysql import SSCursor
from aiomysql import sa
from sqlalchemy import MetaData, Table, Column, Integer, String
meta = MetaData()
tbl = Table('tbl', meta,
Column('id', Integer, nullable=False, primary_key=True),
Column('name', String(255)))
@pytest.fixture
def table(loop, connection_creator, table_cleanup):
async def f():
connection = await connection_creator()
cursor = await connection.cursor()
await cursor.execute("DROP TABLE IF EXISTS tbl;")
await cursor.execute("""CREATE TABLE tbl (
id MEDIUMINT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
PRIMARY KEY (id));""")
for i in [(1, 'a'), (2, 'b'), (3, 'c')]:
await cursor.execute("INSERT INTO tbl VALUES(%s, %s)", i)
await cursor.execute("commit;")
await cursor.close()
table_cleanup('tbl')
loop.run_until_complete(f())
@pytest.mark.run_loop
async def test_async_cursor(cursor, table):
ret = []
await cursor.execute('SELECT * from tbl;')
async for i in cursor:
ret.append(i)
assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
@pytest.mark.run_loop
async def test_async_cursor_server_side(connection, table):
ret = []
cursor = await connection.cursor(SSCursor)
await cursor.execute('SELECT * from tbl;')
async for i in cursor:
ret.append(i)
assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
@pytest.mark.run_loop
async def test_async_iter_over_sa_result(mysql_params, table, loop):
ret = []
engine = await sa.create_engine(**mysql_params, loop=loop)
conn = await engine.acquire()
async for i in (await conn.execute(tbl.select())):
ret.append(i)
assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
engine.terminate()