Skip to content

Commit 4fdaac8

Browse files
authored
Merge pull request #2 from singer-io/master
Sync with upstream
2 parents 61f1b78 + f1060b1 commit 4fdaac8

File tree

7 files changed

+102
-10
lines changed

7 files changed

+102
-10
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ jobs:
1010
command: |
1111
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
1212
source dev_env.sh
13+
export LC_ALL=C
1314
python3 -m venv /usr/local/share/virtualenvs/tap-postgres
1415
source /usr/local/share/virtualenvs/tap-postgres/bin/activate
1516
pip install .

.github/pull_request_template.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Description of change
2+
(write a short description here or paste a link to JIRA)
3+
4+
# QA steps
5+
- [ ] automated tests passing
6+
- [ ] manual qa steps passing (list below)
7+
8+
# Risks
9+
10+
# Rollback steps
11+
- revert this branch

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Changelog
2+
3+
## 0.0.65
4+
* Add support for `int8[]` (`bigint[]`) array types to log-based replication [#69](https://github.com/singer-io/tap-postgres/pull/69)
5+
6+
## 0.0.64
7+
* Pass string to `decimal.Decimal` when handling numeric data type [#67](https://github.com/singer-io/tap-postgres/pull/67)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from setuptools import setup
44

55
setup(name='tap-postgres',
6-
version='0.0.59',
6+
version='0.0.65',
77
description='Singer.io tap for extracting data from PostgreSQL',
88
author='Stitch',
99
url='https://singer.io',

tap_postgres/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def produce_table_info(conn):
273273
AND NOT a.attisdropped
274274
AND pg_class.relkind IN ('r', 'v', 'm')
275275
AND n.nspname NOT in ('pg_toast', 'pg_catalog', 'information_schema')
276-
AND has_table_privilege(pg_class.oid, 'SELECT') = true """)
276+
AND has_column_privilege(pg_class.oid, attname, 'SELECT') = true """)
277277
for row in cur.fetchall():
278278
row_count, is_view, schema_name, table_name, *col_info = row
279279

@@ -493,7 +493,7 @@ def sync_method_for_streams(streams, state, default_replication_method):
493493
state = clear_state_on_replication_change(state, stream['tap_stream_id'], replication_key, replication_method)
494494

495495
if replication_method not in set(['LOG_BASED', 'FULL_TABLE', 'INCREMENTAL']):
496-
raise Exception("Unrecognized replication_method {}".format(replication_method))
496+
raise Exception("Unrecognized replication_method {} for stream {}".format(replication_method, stream['tap_stream_id']))
497497

498498
md_map = metadata.to_map(stream['metadata'])
499499
desired_columns = [c for c in stream['schema']['properties'].keys() if sync_common.should_sync_column(md_map, c)]
@@ -691,9 +691,9 @@ def main_impl():
691691

692692
if args.discover:
693693
do_discovery(conn_config)
694-
elif args.properties or args.catalog:
694+
elif args.properties:
695695
state = args.state
696-
do_sync(conn_config, args.catalog.to_dict() if args.catalog else args.properties, args.config.get('default_replication_method'), state)
696+
do_sync(conn_config, args.properties, args.config.get('default_replication_method'), state)
697697
else:
698698
LOGGER.info("No properties were selected")
699699

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import tap_postgres.sync_strategies.common as sync_common
1111
from dateutil.parser import parse
1212
import psycopg2
13+
from psycopg2 import sql
1314
import copy
1415
from select import select
1516
from functools import reduce
@@ -68,11 +69,14 @@ def tuples_to_map(accum, t):
6869
accum[t[0]] = t[1]
6970
return accum
7071

72+
def create_hstore_elem_query(elem):
73+
return sql.SQL("SELECT hstore_to_array({})").format(sql.Literal(elem))
74+
7175
def create_hstore_elem(conn_info, elem):
7276
with post_db.open_connection(conn_info) as conn:
7377
with conn.cursor() as cur:
74-
sql = """SELECT hstore_to_array('{}')""".format(elem)
75-
cur.execute(sql)
78+
query = create_hstore_elem_query(elem)
79+
cur.execute(query)
7680
res = cur.fetchone()[0]
7781
hstore_elem = reduce(tuples_to_map, [res[i:i + 2] for i in range(0, len(res), 2)], {})
7882
return hstore_elem
@@ -101,6 +105,8 @@ def create_array_elem(elem, sql_datatype, conn_info):
101105
cast_datatype = 'text[]'
102106
elif sql_datatype == 'integer[]':
103107
cast_datatype = 'integer[]'
108+
elif sql_datatype == 'bigint[]':
109+
cast_datatype = 'bigint[]'
104110
elif sql_datatype == 'inet[]':
105111
cast_datatype = 'inet[]'
106112
elif sql_datatype == 'json[]':
@@ -130,8 +136,8 @@ def create_array_elem(elem, sql_datatype, conn_info):
130136
#custom datatypes like enums
131137
cast_datatype = 'text[]'
132138

133-
sql = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype)
134-
cur.execute(sql)
139+
sql_stmt = """SELECT $stitch_quote${}$stitch_quote$::{}""".format(elem, cast_datatype)
140+
cur.execute(sql_stmt)
135141
res = cur.fetchone()[0]
136142
return res
137143

@@ -164,7 +170,7 @@ def selected_value_to_singer_value_impl(elem, og_sql_datatype, conn_info):
164170
if sql_datatype == 'hstore':
165171
return create_hstore_elem(conn_info, elem)
166172
if 'numeric' in sql_datatype:
167-
return decimal.Decimal(elem)
173+
return decimal.Decimal(str(elem))
168174
if isinstance(elem, int):
169175
return elem
170176
if isinstance(elem, float):

tests/test_discovery.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,16 @@ def test_catalog(self):
365365
'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS},
366366
stream_dict.get('schema'))
367367

368+
def test_escaping_values(self):
369+
key = 'nickname'
370+
value = "Dave's Courtyard"
371+
elem = '"{}"=>"{}"'.format(key, value)
372+
373+
with get_test_connection() as conn:
374+
with conn.cursor() as cur:
375+
query = tap_postgres.sync_strategies.logical_replication.create_hstore_elem_query(elem)
376+
self.assertEqual(query.as_string(cur), "SELECT hstore_to_array('\"nickname\"=>\"Dave''s Courtyard\"')")
377+
368378

369379
class TestEnumTable(unittest.TestCase):
370380
maxDiff = None
@@ -566,6 +576,63 @@ def test_catalog(self):
566576
'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS},
567577
stream_dict.get('schema'))
568578

579+
class TestColumnGrants(unittest.TestCase):
580+
maxDiff = None
581+
table_name = 'CHICKEN TIMES'
582+
user = 'tmp_user_for_grant_tests'
583+
password = 'password'
584+
585+
def setUp(self):
586+
table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True},
587+
{"name" : 'size integer', "type" : "integer", "quoted" : True},
588+
{"name" : 'size smallint', "type" : "smallint", "quoted" : True},
589+
{"name" : 'size bigint', "type" : "bigint", "quoted" : True}],
590+
"name" : TestColumnGrants.table_name}
591+
ensure_test_table(table_spec)
592+
593+
with get_test_connection() as conn:
594+
cur = conn.cursor()
595+
596+
sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password)
597+
LOGGER.info(sql)
598+
cur.execute(sql)
599+
600+
sql = """ CREATE USER {} WITH PASSWORD '{}' """.format(self.user, self.password)
601+
LOGGER.info(sql)
602+
cur.execute(sql)
603+
604+
sql = """ GRANT SELECT ("id") ON "{}" TO {}""".format(TestColumnGrants.table_name, self.user)
605+
LOGGER.info("running sql: {}".format(sql))
606+
cur.execute(sql)
607+
608+
609+
610+
611+
def test_catalog(self):
612+
conn_config = get_test_connection_config()
613+
conn_config['user'] = self.user
614+
conn_config['password'] = self.password
615+
streams = tap_postgres.do_discovery(conn_config)
616+
chicken_streams = [s for s in streams if s['tap_stream_id'] == 'postgres-public-CHICKEN TIMES']
617+
618+
self.assertEqual(len(chicken_streams), 1)
619+
stream_dict = chicken_streams[0]
620+
621+
self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name'))
622+
self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream'))
623+
624+
625+
stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb'])
626+
627+
self.assertEqual(metadata.to_map(stream_dict.get('metadata')),
628+
{() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0},
629+
('properties', 'id') : {'inclusion': 'available', 'sql-datatype' : 'integer', 'selected-by-default' : True}})
630+
631+
self.assertEqual({'definitions' : tap_postgres.BASE_RECURSIVE_SCHEMAS,
632+
'type': 'object',
633+
'properties': {'id': {'type': ['null', 'integer'], 'minimum': -2147483648, 'maximum': 2147483647}}},
634+
stream_dict.get('schema'))
635+
569636

570637
if __name__== "__main__":
571638
test1 = TestArraysTable()

0 commit comments

Comments
 (0)