|
2 | 2 | # standard library |
3 | 3 | import unittest |
4 | 4 | from unittest.mock import MagicMock, patch |
5 | | -import mock |
6 | 5 | from copy import copy |
7 | 6 |
|
8 | 7 | # first party |
|
11 | 10 | import delphi.operations.secrets as secrets |
12 | 11 | from delphi_utils import get_structured_logger |
13 | 12 |
|
14 | | - |
15 | 13 | # third party |
16 | 14 | import mysql.connector |
17 | 15 | import pandas as pd |
18 | 16 | from pathlib import Path |
19 | | -import pdb |
20 | 17 |
|
21 | 18 | # py3tester coverage target (equivalent to `import *`) |
22 | 19 | # __test_target__ = 'delphi.epidata.acquisition.covid_hosp.facility.update' |
|
25 | 22 |
|
26 | 23 | class AcquisitionTests(unittest.TestCase): |
27 | 24 | logger = get_structured_logger() |
28 | | - |
| 25 | + |
29 | 26 | def setUp(self): |
30 | 27 | """Perform per-test setup.""" |
31 | 28 |
|
@@ -54,76 +51,51 @@ def setUp(self): |
54 | 51 | epidata_cnx.commit() |
55 | 52 | epidata_cur.close() |
56 | 53 | #epidata_cnx.close() |
57 | | - |
| 54 | + |
58 | 55 | # make connection and cursor available to test cases |
59 | 56 | self.cnx = epidata_cnx |
60 | | - #self.cur = epidata_cnx.cursor() |
61 | | - |
| 57 | + self.cur = epidata_cnx.cursor() |
| 58 | + |
62 | 59 | def tearDown(self): |
63 | 60 | """Perform per-test teardown.""" |
64 | | - #self.cur.close() |
| 61 | + self.cur.close() |
65 | 62 | self.cnx.close() |
66 | 63 |
|
67 | | - @mock.patch("mysql.connector.connect") |
68 | | - def test_rvdss_repiratory_detections(self,mock_sql): |
| 64 | + @patch("mysql.connector.connect") |
| 65 | + def test_rvdss_repiratory_detections(self, mock_sql): |
| 66 | + connection_mock = MagicMock() |
| 67 | + |
69 | 68 | TEST_DIR = Path(__file__).parent.parent.parent.parent |
70 | | - detection_data = pd.read_csv(str(TEST_DIR) + "/testdata/acquisition/rvdss/RVD_CurrentWeekTable_Formatted.csv") |
| 69 | + detection_data = pd.read_csv(str(TEST_DIR) + "/testdata/acquisition/rvdss/RVD_CurrentWeekTable_Formatted.csv") |
71 | 70 | detection_data['time_type'] = "week" |
72 | | - detection_subset = detection_data[(detection_data['geo_value'].isin(['nl', 'nb'])) & (detection_data['time_value'].isin([202408-31, 20240907])) ] |
| 71 | + detection_subset = detection_data[(detection_data['geo_value'].isin(['nl', 'nb'])) & (detection_data['time_value'].isin([20240831, 20240907])) ] |
73 | 72 |
|
74 | | - connection_mock = MagicMock() |
75 | 73 | # make sure the data does not yet exist |
76 | 74 | with self.subTest(name='no data yet'): |
77 | 75 | response = Epidata.rvdss(geo_type='province', |
78 | 76 | time_values= [202435, 202436], |
79 | 77 | geo_value = ['nl','nb']) |
80 | 78 | self.assertEqual(response['result'], -2, response) |
81 | 79 |
|
82 | | - |
83 | 80 | # acquire sample data into local database |
84 | | - with self.subTest(name='first acquisition'): |
85 | | - #mock_sql.cursor.return_value = self.cnx.cursor() |
| 81 | + with self.subTest(name='first acquisition'): |
| 82 | + # When the MagicMock connection's `cursor()` method is called, return |
| 83 | + # a real cursor made from the current open connection `cnx`. |
86 | 84 | connection_mock.cursor.return_value = self.cnx.cursor() |
| 85 | + # Commit via the current open connection `cnx`, from which the cursor |
| 86 | + # is derived |
| 87 | + connection_mock.commit = self.cnx.commit |
87 | 88 | mock_sql.return_value = connection_mock |
88 | | - |
89 | | - rvdss_cols_subset = [col for col in detection_subset.columns if col in rvdss_cols] |
90 | | - pdb.set_trace() |
91 | | - update(detection_subset,self.logger) |
92 | | - |
93 | | - response = Epidata.rvdss(geo_type='province', |
94 | | - time_values= [202435, 202436], |
95 | | - geo_value = ['nl','nb']) |
96 | | - |
97 | | - self.assertEqual(response['result'], 1) |
98 | | - |
99 | | - with self.subTest(name='first acquisition2'): |
100 | | - #mock_sql.cursor.return_value = self.cnx.cursor() |
101 | | - connection_mock.cursor.return_value = self.cnx.cursor() |
102 | | - mock_sql.return_value = connection_mock |
103 | | - |
104 | | - rvdss_cols_subset = [col for col in detection_subset.columns if col in rvdss_cols] |
105 | | - update(detection_subset,self.logger) |
106 | | - |
107 | | - response = Epidata.rvdss(geo_type='province', |
108 | | - time_values= [202435, 202436], |
109 | | - geo_value = ['nl','nb']) |
110 | | - |
111 | | - self.assertEqual(response['result'], 1) |
112 | | - |
113 | | - with self.subTest(name='first acquisition3'): |
114 | | - #mock_sql.cursor.return_value = self.cnx.cursor() |
115 | | - connection_mock.cursor.return_value = self.cnx.cursor() |
116 | | - mock_sql.return_value = connection_mock |
117 | | - |
118 | | - rvdss_cols_subset = [col for col in detection_subset.columns if col in rvdss_cols] |
119 | | - update(detection_subset,self.logger) |
120 | | - |
| 89 | + |
| 90 | + update(detection_subset, self.logger) |
| 91 | + |
121 | 92 | response = Epidata.rvdss(geo_type='province', |
122 | 93 | time_values= [202435, 202436], |
123 | 94 | geo_value = ['nl','nb']) |
124 | | - |
| 95 | + |
125 | 96 | self.assertEqual(response['result'], 1) |
126 | 97 |
|
| 98 | + |
127 | 99 | # # make sure the data now exists |
128 | 100 | # with self.subTest(name='initial data checks'): |
129 | 101 | # expected_spotchecks = { |
@@ -164,5 +136,3 @@ def test_rvdss_repiratory_detections(self,mock_sql): |
164 | 136 | # '450822', Epidata.range(20200101, 20210101)) |
165 | 137 | # self.assertEqual(response['result'], 1) |
166 | 138 | # self.assertEqual(len(response['epidata']), 2) |
167 | | - pass |
168 | | - |
0 commit comments