|
1 | | -import sys |
| 1 | +from tornado_mysql import ProgrammingError |
| 2 | +from tornado import gen |
| 3 | +from tornado.testing import gen_test |
| 4 | +from tornado_mysql import NotSupportedError |
2 | 5 |
|
3 | | -from tornado_mysql.tests import base |
4 | 6 | import tornado_mysql.cursors |
| 7 | +from tornado_mysql.tests import base |
5 | 8 |
|
6 | 9 |
|
7 | 10 | class TestSSCursor(base.PyMySQLTestCase): |
| 11 | + |
| 12 | + data = [ |
| 13 | + ('America', '', 'America/Jamaica'), |
| 14 | + ('America', '', 'America/Los_Angeles'), |
| 15 | + ('America', '', 'America/Lima'), |
| 16 | + ('America', '', 'America/New_York'), |
| 17 | + ('America', '', 'America/Menominee'), |
| 18 | + ('America', '', 'America/Havana'), |
| 19 | + ('America', '', 'America/El_Salvador'), |
| 20 | + ('America', '', 'America/Costa_Rica'), |
| 21 | + ('America', '', 'America/Denver'), |
| 22 | + ('America', '', 'America/Detroit'),] |
| 23 | + |
| 24 | + @gen_test |
8 | 25 | def test_SSCursor(self): |
9 | 26 | affected_rows = 18446744073709551615 |
10 | 27 |
|
11 | 28 | conn = self.connections[0] |
12 | | - data = [ |
13 | | - ('America', '', 'America/Jamaica'), |
14 | | - ('America', '', 'America/Los_Angeles'), |
15 | | - ('America', '', 'America/Lima'), |
16 | | - ('America', '', 'America/New_York'), |
17 | | - ('America', '', 'America/Menominee'), |
18 | | - ('America', '', 'America/Havana'), |
19 | | - ('America', '', 'America/El_Salvador'), |
20 | | - ('America', '', 'America/Costa_Rica'), |
21 | | - ('America', '', 'America/Denver'), |
22 | | - ('America', '', 'America/Detroit'),] |
23 | 29 |
|
24 | 30 | try: |
25 | 31 | cursor = conn.cursor(tornado_mysql.cursors.SSCursor) |
26 | 32 |
|
27 | 33 | # Create table |
28 | | - cursor.execute(('CREATE TABLE tz_data (' |
29 | | - 'region VARCHAR(64),' |
30 | | - 'zone VARCHAR(64),' |
31 | | - 'name VARCHAR(64))')) |
| 34 | + yield cursor.execute('DROP TABLE IF EXISTS tz_data;') |
| 35 | + yield cursor.execute(('CREATE TABLE tz_data (' |
| 36 | + 'region VARCHAR(64),' |
| 37 | + 'zone VARCHAR(64),' |
| 38 | + 'name VARCHAR(64))')) |
32 | 39 |
|
33 | 40 | # Test INSERT |
34 | | - for i in data: |
35 | | - cursor.execute('INSERT INTO tz_data VALUES (%s, %s, %s)', i) |
| 41 | + for i in self.data: |
| 42 | + yield cursor.execute('INSERT INTO tz_data VALUES (%s, %s, %s)', i) |
36 | 43 | self.assertEqual(conn.affected_rows(), 1, 'affected_rows does not match') |
37 | | - conn.commit() |
38 | | - |
| 44 | + yield conn.commit() |
39 | 45 | # Test fetchone() |
40 | 46 | iter = 0 |
41 | | - cursor.execute('SELECT * FROM tz_data') |
| 47 | + yield cursor.execute('SELECT * FROM tz_data;') |
42 | 48 | while True: |
43 | | - row = cursor.fetchone() |
| 49 | + row = yield cursor.fetchone() |
| 50 | + |
44 | 51 | if row is None: |
45 | 52 | break |
46 | 53 | iter += 1 |
47 | 54 |
|
48 | 55 | # Test cursor.rowcount |
49 | 56 | self.assertEqual(cursor.rowcount, affected_rows, |
50 | | - 'cursor.rowcount != %s' % (str(affected_rows))) |
| 57 | + 'cursor.rowcount != %s' % (str(affected_rows))) |
51 | 58 |
|
52 | 59 | # Test cursor.rownumber |
53 | 60 | self.assertEqual(cursor.rownumber, iter, |
54 | | - 'cursor.rowcount != %s' % (str(iter))) |
| 61 | + 'cursor.rowcount != %s' % (str(iter))) |
55 | 62 |
|
56 | 63 | # Test row came out the same as it went in |
57 | | - self.assertEqual((row in data), True, |
58 | | - 'Row not found in source data') |
| 64 | + self.assertEqual((row in self.data), True, |
| 65 | + 'Row not found in source self.data') |
59 | 66 |
|
60 | 67 | # Test fetchall |
61 | | - cursor.execute('SELECT * FROM tz_data') |
62 | | - self.assertEqual(len(cursor.fetchall()), len(data), |
63 | | - 'fetchall failed. Number of rows does not match') |
| 68 | + yield cursor.execute('SELECT * FROM tz_data') |
| 69 | + r = yield cursor.fetchall() |
| 70 | + self.assertEqual(len(r), len(self.data), |
| 71 | + 'fetchall failed. Number of rows does not match') |
64 | 72 |
|
65 | 73 | # Test fetchmany |
66 | | - cursor.execute('SELECT * FROM tz_data') |
67 | | - self.assertEqual(len(cursor.fetchmany(2)), 2, |
68 | | - 'fetchmany failed. Number of rows does not match') |
| 74 | + yield cursor.execute('SELECT * FROM tz_data') |
| 75 | + r = yield cursor.fetchmany(2) |
| 76 | + self.assertEqual(len(r), 2, |
| 77 | + 'fetchmany failed. Number of rows does not match') |
69 | 78 |
|
70 | 79 | # So MySQLdb won't throw "Commands out of sync" |
71 | 80 | while True: |
72 | | - res = cursor.fetchone() |
| 81 | + res = yield cursor.fetchone() |
73 | 82 | if res is None: |
74 | 83 | break |
75 | 84 |
|
76 | 85 | # Test update, affected_rows() |
77 | | - cursor.execute('UPDATE tz_data SET zone = %s', ['Foo']) |
78 | | - conn.commit() |
79 | | - self.assertEqual(cursor.rowcount, len(data), |
80 | | - 'Update failed. affected_rows != %s' % (str(len(data)))) |
| 86 | + yield cursor.execute('UPDATE tz_data SET zone = %s', ['Foo']) |
| 87 | + yield conn.commit() |
| 88 | + self.assertEqual(cursor.rowcount, len(self.data), |
| 89 | + 'Update failed. affected_rows != %s' % (str(len(self.data)))) |
81 | 90 |
|
82 | 91 | # Test executemany |
83 | | - cursor.executemany('INSERT INTO tz_data VALUES (%s, %s, %s)', data) |
84 | | - self.assertEqual(cursor.rowcount, len(data), |
85 | | - 'executemany failed. cursor.rowcount != %s' % (str(len(data)))) |
| 92 | + yield cursor.executemany('INSERT INTO tz_data VALUES (%s, %s, %s)', self.data) |
| 93 | + self.assertEqual(cursor.rowcount, len(self.data), |
| 94 | + 'executemany failed. cursor.rowcount != %s' % (str(len(self.data)))) |
86 | 95 |
|
87 | 96 | finally: |
88 | | - cursor.execute('DROP TABLE tz_data') |
89 | | - cursor.close() |
| 97 | + yield cursor.execute('DROP TABLE tz_data') |
| 98 | + yield cursor.close() |
| 99 | + |
| 100 | + @gen.coroutine |
| 101 | + def _prepare(self): |
| 102 | + conn = self.connections[0] |
| 103 | + cursor = conn.cursor() |
| 104 | + yield cursor.execute('DROP TABLE IF EXISTS tz_data;') |
| 105 | + yield cursor.execute('CREATE TABLE tz_data (' |
| 106 | + 'region VARCHAR(64),' |
| 107 | + 'zone VARCHAR(64),' |
| 108 | + 'name VARCHAR(64))') |
| 109 | + |
| 110 | + yield cursor.executemany( |
| 111 | + 'INSERT INTO tz_data VALUES (%s, %s, %s)', self.data) |
| 112 | + yield conn.commit() |
| 113 | + yield cursor.close() |
| 114 | + |
| 115 | + @gen.coroutine |
| 116 | + def _cleanup(self): |
| 117 | + conn = self.connections[0] |
| 118 | + cursor = conn.cursor() |
| 119 | + yield cursor.execute('DROP TABLE IF EXISTS tz_data;') |
| 120 | + |
| 121 | + @gen_test |
| 122 | + def test_sscursor_executemany(self): |
| 123 | + conn = self.connections[0] |
| 124 | + yield self._prepare() |
| 125 | + cursor = conn.cursor(tornado_mysql.cursors.SSCursor) |
| 126 | + # Test executemany |
| 127 | + yield cursor.executemany( |
| 128 | + 'INSERT INTO tz_data VALUES (%s, %s, %s)', self.data) |
| 129 | + msg = 'executemany failed. cursor.rowcount != %s' |
| 130 | + self.assertEqual(cursor.rowcount, len(self.data), |
| 131 | + msg % (str(len(self.data)))) |
| 132 | + yield self._cleanup() |
| 133 | + |
| 134 | + @gen_test |
| 135 | + def test_sscursor_scroll_relative(self): |
| 136 | + conn = self.connections[0] |
| 137 | + yield self._prepare() |
| 138 | + cursor = conn.cursor(tornado_mysql.cursors.SSCursor) |
| 139 | + yield cursor.execute('SELECT * FROM tz_data;') |
| 140 | + yield cursor.scroll(1) |
| 141 | + ret = yield cursor.fetchone() |
| 142 | + self.assertEqual(('America', '', 'America/Los_Angeles'), ret) |
| 143 | + yield self._cleanup() |
| 144 | + |
| 145 | + @gen_test |
| 146 | + def test_sscursor_scroll_absolute(self): |
| 147 | + conn = self.connections[0] |
| 148 | + yield self._prepare() |
| 149 | + cursor = conn.cursor(tornado_mysql.cursors.SSCursor) |
| 150 | + yield cursor.execute('SELECT * FROM tz_data;') |
| 151 | + yield cursor.scroll(2, mode='absolute') |
| 152 | + ret = yield cursor.fetchone() |
| 153 | + self.assertEqual(('America', '', 'America/Lima'), ret) |
| 154 | + yield self._cleanup() |
| 155 | + |
| 156 | + @gen_test |
| 157 | + def test_sscursor_scroll_errors(self): |
| 158 | + yield self._prepare() |
| 159 | + conn = self.connections[0] |
| 160 | + cursor = conn.cursor(tornado_mysql.cursors.SSCursor) |
| 161 | + |
| 162 | + yield cursor.execute('SELECT * FROM tz_data;') |
| 163 | + |
| 164 | + with self.assertRaises(NotSupportedError): |
| 165 | + yield cursor.scroll(-2, mode='relative') |
| 166 | + |
| 167 | + yield cursor.scroll(2, mode='absolute') |
| 168 | + |
| 169 | + with self.assertRaises(NotSupportedError): |
| 170 | + yield cursor.scroll(1, mode='absolute') |
| 171 | + with self.assertRaises(ProgrammingError): |
| 172 | + yield cursor.scroll(3, mode='not_valid_mode') |
| 173 | + yield self._cleanup() |
| 174 | + |
90 | 175 |
|
91 | 176 | __all__ = ["TestSSCursor"] |
92 | 177 |
|
| 178 | + |
93 | 179 | if __name__ == "__main__": |
94 | 180 | import unittest |
95 | 181 | unittest.main() |
0 commit comments