-
Notifications
You must be signed in to change notification settings - Fork 261
/
Copy pathtest_bulk_inserts.py
190 lines (168 loc) · 6.14 KB
/
test_bulk_inserts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import pytest
from aiomysql import DictCursor
@pytest.fixture
def table(loop, connection, table_cleanup):
async def f():
cursor = await connection.cursor(DictCursor)
sql = """CREATE TABLE bulkinsert (id INT(11), name CHAR(20),
age INT, height INT, PRIMARY KEY (id))"""
await cursor.execute(sql)
table_cleanup('bulkinsert')
loop.run_until_complete(f())
@pytest.fixture
def assert_records(cursor):
async def f(data):
await cursor.execute(
"SELECT id, name, age, height FROM bulkinsert")
result = await cursor.fetchall()
await cursor.execute('COMMIT')
assert sorted(data) == sorted(result)
return f
@pytest.fixture
def assert_dict_records(connection):
async def f(data):
cursor = await connection.cursor(DictCursor)
await cursor.execute(
"SELECT id, name, age, height FROM bulkinsert")
result = await cursor.fetchall()
await cursor.execute('COMMIT')
assert sorted(data, key=lambda k: k['id']) == \
sorted(result, key=lambda k: k['id'])
return f
@pytest.mark.run_loop
async def test_bulk_insert(cursor, table, assert_records):
data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
await cursor.executemany(
"INSERT INTO bulkinsert (id, name, age, height) "
"VALUES (%s,%s,%s,%s)", data)
expected = bytearray(b"INSERT INTO bulkinsert (id, name, age, height) "
b"VALUES (0,'bob',21,123),(1,'jim',56,45),"
b"(2,'fred',100,180)")
assert cursor._last_executed == expected
await cursor.execute('commit')
await assert_records(data)
@pytest.mark.run_loop
async def test_bulk_insert_multiline_statement(cursor, table, assert_records):
data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
await cursor.executemany("""insert
into bulkinsert (id, name,
age, height)
values (%s,
%s , %s,
%s )
""", data)
assert cursor._last_executed.strip() == bytearray(b"""insert
into bulkinsert (id, name,
age, height)
values (0,
'bob' , 21,
123 ),(1,
'jim' , 56,
45 ),(2,
'fred' , 100,
180 )""")
await cursor.execute('COMMIT')
await assert_records(data)
@pytest.mark.run_loop
async def test_bulk_insert_single_record(cursor, table, assert_records):
data = [(0, "bob", 21, 123)]
await cursor.executemany(
"insert into bulkinsert (id, name, age, height) "
"values (%s,%s,%s,%s)", data)
await cursor.execute('COMMIT')
await assert_records(data)
@pytest.mark.run_loop
async def test_insert_on_duplicate_key_update(cursor, table, assert_records):
# executemany should work with "insert ... on update" "
data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
await cursor.executemany("""insert
into bulkinsert (id, name,
age, height)
values (%s,
%s , %s,
%s ) on duplicate key update
age = values(age)
""", data)
assert cursor._last_executed.strip() == bytearray(b"""insert
into bulkinsert (id, name,
age, height)
values (0,
'bob' , 21,
123 ),(1,
'jim' , 56,
45 ),(2,
'fred' , 100,
180 ) on duplicate key update
age = values(age)""")
await cursor.execute('COMMIT')
await assert_records(data)
@pytest.mark.run_loop
async def test_bulk_insert_with_params_as_dict(cursor, table,
assert_dict_records):
data = [
{
'id': 0,
'name': "bob",
'age': 21,
'height': 123
},
{
'id': 1,
'name': "jim",
'age': 56,
'height': 45
},
{
'id': 2,
'name': "fred",
'age': 100,
'height': 180
},
]
await cursor.executemany(
"INSERT INTO bulkinsert (id, name, age, height) "
"VALUES (%(id)s,%(name)s,%(age)s,%(height)s)", data)
expected = bytearray(b"INSERT INTO bulkinsert (id, name, age, height) "
b"VALUES (0,'bob',21,123),(1,'jim',56,45),"
b"(2,'fred',100,180)")
assert cursor._last_executed == expected
await cursor.execute('commit')
await assert_dict_records(data)
@pytest.mark.run_loop
async def test_bulk_insert_with_precedence_spaces(cursor, table,
assert_records):
data = [(0, "bob", 21, 123), (1, "jim", 56, 45)]
await cursor.executemany("""
INSERT INTO bulkinsert (id, name, age, height)
VALUES (%s,%s,%s,%s)
""", data)
expected = bytearray(b"INSERT INTO bulkinsert (id, name, age, height)"
b"\n VALUES (0,\'bob\',21,123),"
b"(1,\'jim\',56,45)\n ")
assert cursor._last_executed == expected
await cursor.execute('commit')
await assert_records(data)
@pytest.mark.run_loop
async def test_bulk_replace(cursor, table, assert_records):
data = [(0, "bob", 21, 123), (0, "jim", 56, 45)]
sql = ("REPLACE INTO bulkinsert (id, name, age, height) " +
"VALUES (%s,%s,%s,%s)")
await cursor.executemany(sql, data)
assert cursor._last_executed.strip() == bytearray(
b"REPLACE INTO bulkinsert (id, name, age, height) " +
b"VALUES (0,'bob',21,123),(0,'jim',56,45)"
)
await cursor.execute('COMMIT')
await assert_records([(0, "jim", 56, 45)])
@pytest.mark.run_loop
async def test_bulk_insert_with_semicolon_at_the_end(cursor, table,
assert_records):
data = [(0, "bob", 21, 123), (1, "jim", 56, 45)]
await cursor.executemany(
"INSERT INTO bulkinsert (id, name, age, height) "
"VALUES (%s,%s,%s,%s);", data)
expected = bytearray(b"INSERT INTO bulkinsert (id, name, age, height) "
b"VALUES (0,'bob',21,123),(1,'jim',56,45)")
assert cursor._last_executed == expected
await cursor.execute('commit')
await assert_records(data)