Skip to content

Commit e53355d

Browse files
committed
Fix some of the kinks in imports
1 parent a4173c6 commit e53355d

15 files changed

+30
-20
lines changed

.gitignore

100644100755
File mode changed.

LICENSE

100644100755
File mode changed.

MANIFEST.in

100644100755
File mode changed.

Makefile

100644100755
File mode changed.

README.md

100644100755
File mode changed.

setup.py

100644100755
File mode changed.

tap_mssql/__init__.py

100644100755
+20-11
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ def create_column_metadata(cols):
118118

119119
def discover_catalog(mssql_conn, config):
120120
"""Returns a Catalog describing the structure of the database."""
121-
121+
LOGGER.info("Preparing Catalog")
122+
mssql_conn = MSSQLConnection(config)
122123
filter_dbs_config = config.get("filter_dbs")
123124

124125
if filter_dbs_config:
@@ -134,10 +135,10 @@ def discover_catalog(mssql_conn, config):
134135
)"""
135136

136137
with connect_with_backoff(mssql_conn) as open_conn:
137-
cur = mssql_conn.cursor()
138+
cur = open_conn.cursor()
139+
LOGGER.info("Fetching tables")
138140
cur.execute(
139-
"""
140-
SELECT table_schema,
141+
"""SELECT table_schema,
141142
table_name,
142143
table_type
143144
FROM information_schema.tables c
@@ -153,6 +154,7 @@ def discover_catalog(mssql_conn, config):
153154
table_info[db] = {}
154155

155156
table_info[db][table] = {"row_count": None, "is_view": table_type == "VIEW"}
157+
LOGGER.info("Tables fetched, fetching columns")
156158
cur.execute(
157159
"""with constraint_columns as (
158160
select c.table_schema
@@ -192,6 +194,7 @@ def discover_catalog(mssql_conn, config):
192194
while rec is not None:
193195
columns.append(Column(*rec))
194196
rec = cur.fetchone()
197+
LOGGER.info("Columns Fetched")
195198
entries = []
196199
for (k, cols) in itertools.groupby(columns, lambda c: (c.table_schema, c.table_name)):
197200
cols = list(cols)
@@ -230,7 +233,7 @@ def discover_catalog(mssql_conn, config):
230233
)
231234

232235
entries.append(entry)
233-
236+
LOGGER.info("Catalog ready")
234237
return Catalog(entries)
235238

236239

@@ -353,6 +356,7 @@ def get_non_binlog_streams(mssql_conn, catalog, config, state):
353356
3. any streams that do not have a replication method of LOG_BASED
354357
355358
"""
359+
mssql_conn = MSSQLConnection(config)
356360
discovered = discover_catalog(mssql_conn, config)
357361

358362
# Filter catalog to include only selected streams
@@ -426,8 +430,9 @@ def write_schema_message(catalog_entry, bookmark_properties=[]):
426430
)
427431

428432

429-
def do_sync_incremental(mssql_conn, catalog_entry, state, columns):
433+
def do_sync_incremental(mssql_conn, config, catalog_entry, state, columns):
430434
LOGGER.info("Stream %s is using incremental replication", catalog_entry.stream)
435+
mssql_conn = MSSQLConnection(config)
431436

432437
md_map = metadata.to_map(catalog_entry.metadata)
433438
replication_key = md_map.get((), {}).get("replication-key")
@@ -441,20 +446,21 @@ def do_sync_incremental(mssql_conn, catalog_entry, state, columns):
441446

442447
write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key])
443448

444-
incremental.sync_table(mssql_conn, catalog_entry, state, columns)
449+
incremental.sync_table(mssql_conn, config, catalog_entry, state, columns)
445450

446451
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
447452

448453

449-
def do_sync_full_table(mssql_conn, catalog_entry, state, columns):
454+
def do_sync_full_table(mssql_conn, config, catalog_entry, state, columns):
450455
LOGGER.info("Stream %s is using full table replication", catalog_entry.stream)
451456
key_properties = common.get_key_properties(catalog_entry)
457+
mssql_conn = MSSQLConnection(config)
452458

453459
write_schema_message(catalog_entry)
454460

455461
stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state)
456462

457-
full_table.sync_table(mssql_conn, catalog_entry, state, columns, stream_version)
463+
full_table.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version)
458464

459465
# Prefer initial_full_table_complete going forward
460466
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "version")
@@ -467,6 +473,8 @@ def do_sync_full_table(mssql_conn, catalog_entry, state, columns):
467473

468474

469475
def sync_non_binlog_streams(mssql_conn, non_binlog_catalog, config, state):
476+
mssql_conn = MSSQLConnection(config)
477+
470478
for catalog_entry in non_binlog_catalog.streams:
471479
columns = list(catalog_entry.schema.properties.keys())
472480

@@ -492,9 +500,9 @@ def sync_non_binlog_streams(mssql_conn, non_binlog_catalog, config, state):
492500
timer.tags["table"] = catalog_entry.table
493501

494502
if replication_method == "INCREMENTAL":
495-
do_sync_incremental(mssql_conn, catalog_entry, state, columns)
503+
do_sync_incremental(mssql_conn, config, catalog_entry, state, columns)
496504
elif replication_method == "FULL_TABLE":
497-
do_sync_full_table(mssql_conn, catalog_entry, state, columns)
505+
do_sync_full_table(mssql_conn, config, catalog_entry, state, columns)
498506
else:
499507
raise Exception("only INCREMENTAL and FULL TABLE replication methods are supported")
500508

@@ -503,6 +511,7 @@ def sync_non_binlog_streams(mssql_conn, non_binlog_catalog, config, state):
503511

504512

505513
def do_sync(mssql_conn, config, catalog, state):
514+
LOGGER.info("Beginning sync")
506515
non_binlog_catalog = get_non_binlog_streams(mssql_conn, catalog, config, state)
507516
sync_non_binlog_streams(mssql_conn, non_binlog_catalog, config, state)
508517

tap_mssql/connection.py

100644100755
File mode changed.

tap_mssql/sync_strategies/__init__.py

100644100755
File mode changed.

tap_mssql/sync_strategies/common.py

100644100755
+4-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
def escape(string):
1717
if "`" in string:
18-
raise Exception("Can't escape identifier {} because it contains a backtick".format(string))
19-
return "`" + string + "`"
18+
raise Exception("Can't escape identifier {} because it contains a double quote".format(string))
19+
return "\"" + string + "\""
2020

2121

2222
def generate_tap_stream_id(table_schema, table_name):
@@ -137,11 +137,10 @@ def whitelist_bookmark_keys(bookmark_key_set, tap_stream_id, state):
137137
def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params):
138138
replication_key = singer.get_bookmark(state, catalog_entry.tap_stream_id, "replication_key")
139139

140-
query_string = cursor.mogrify(select_sql, params)
140+
# query_string = cursor.mogrify(select_sql, params)
141141

142142
time_extracted = utils.now()
143-
144-
LOGGER.info("Running %s", query_string)
143+
LOGGER.info("Running %s", select_sql)
145144
cursor.execute(select_sql, params)
146145

147146
row = cursor.fetchone()

tap_mssql/sync_strategies/full_table.py

100644100755
+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def generate_bookmark_keys(catalog_entry):
3232
return bookmark_keys
3333

3434

35-
def sync_table(mssql_conn, catalog_entry, state, columns, stream_version):
35+
def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version):
36+
mssql_conn = MSSQLConnection(config)
3637
common.whitelist_bookmark_keys(
3738
generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state
3839
)

tap_mssql/sync_strategies/incremental.py

100644100755
+4-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
BOOKMARK_KEYS = {"replication_key", "replication_key_value", "version"}
1414

1515

16-
def sync_table(mssql_conn, catalog_entry, state, columns):
16+
def sync_table(mssql_conn, config, catalog_entry, state, columns):
17+
mssql_conn = MSSQLConnection(config)
1718
common.whitelist_bookmark_keys(BOOKMARK_KEYS, catalog_entry.tap_stream_id, state)
1819

1920
catalog_metadata = metadata.to_map(catalog_entry.metadata)
@@ -54,13 +55,13 @@ def sync_table(mssql_conn, catalog_entry, state, columns):
5455
if catalog_entry.schema.properties[replication_key_metadata].format == "date-time":
5556
replication_key_value = pendulum.parse(replication_key_value)
5657

57-
select_sql += " WHERE `{}` >= %(replication_key_value)s ORDER BY `{}` ASC".format(
58+
select_sql += " WHERE \"{}\" >= %(replication_key_value)s ORDER BY \"{}\" ASC".format(
5859
replication_key_metadata, replication_key_metadata
5960
)
6061

6162
params["replication_key_value"] = replication_key_value
6263
elif replication_key_metadata is not None:
63-
select_sql += " ORDER BY `{}` ASC".format(replication_key_metadata)
64+
select_sql += " ORDER BY \"{}\" ASC".format(replication_key_metadata)
6465

6566
common.sync_query(
6667
cur, catalog_entry, state, select_sql, columns, stream_version, params

tests/test_full_table_interruption.py

100644100755
File mode changed.

tests/test_tap_mysql.py

100644100755
File mode changed.

tests/utils.py

100644100755
File mode changed.

0 commit comments

Comments
 (0)