-
Notifications
You must be signed in to change notification settings - Fork 18
feat: Activate Version with example test #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
e25b99d
3280b01
43dc675
c433682
b367081
f908796
2ba215b
fff8458
67989b3
e3fad47
42441da
f40ba63
aacbed1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,20 +14,22 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target | |
* `schema-flattening` | ||
|
||
## Settings | ||
|
||
| Setting | Required | Default | Description | | ||
| :------------------- | :------: | :-----------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | ||
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | | ||
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | | ||
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | ||
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | ||
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | | ||
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | ||
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | | ||
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | ||
| stream_map_config | False | None | User-defined config values to be used within map expressions. | | ||
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | | ||
| flattening_max_depth | False | None | The max depth to flatten schemas. | | ||
| Setting | Required | Default | Description | | ||
| :-------------------- | :------: | :-----------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ||
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | | ||
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | | ||
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | ||
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | ||
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | | ||
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must esacpe password specialcharacters properly seehttps://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | ||
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | | ||
| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup | | ||
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Boolean should be generated as True / False here I think? I'll leave for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Very strange that it shows defaults of |
||
| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. | | ||
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | ||
| stream_map_config | False | None | User-defined config values to be used within map expressions. | | ||
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | | ||
| flattening_max_depth | False | None | The max depth to flatten schemas. | | ||
|
||
A full list of supported settings and capabilities is available by running: `target-postgres --about` | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,12 @@ | |
import uuid | ||
from typing import Any, Dict, Iterable, List, Optional, Union | ||
|
||
import sqlalchemy | ||
from pendulum import now | ||
from singer_sdk.sinks import SQLSink | ||
from sqlalchemy import Column, MetaData, Table, insert | ||
from sqlalchemy.sql import Executable | ||
from sqlalchemy.sql.expression import bindparam | ||
|
||
from target_postgres.connector import PostgresConnector | ||
|
||
|
@@ -252,3 +255,67 @@ def schema_name(self) -> Optional[str]: | |
|
||
# Schema name not detected. | ||
return None | ||
|
||
def activate_version(self, new_version: int) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could take this function and just drop it into the SDK as it is here, this just has a few tweaks that are needed to make this work in a more generic way |
||
"""Bump the active version of the target table. | ||
|
||
Args: | ||
new_version: The version number to activate. | ||
""" | ||
# There's nothing to do if the table doesn't exist yet | ||
# (which it won't the first time the stream is processed) | ||
if not self.connector.table_exists(self.full_table_name): | ||
return | ||
|
||
deleted_at = now() | ||
# Different from SingerSDK as we need to handle types the | ||
# same as SCHEMA messsages | ||
datetime_type = self.connector.to_sql_type( | ||
{"type": "string", "format": "date-time"} | ||
) | ||
|
||
# Different from SingerSDK as we need to handle types the | ||
# same as SCHEMA messsages | ||
integer_type = self.connector.to_sql_type({"type": "integer"}) | ||
|
||
if not self.connector.column_exists( | ||
full_table_name=self.full_table_name, | ||
column_name=self.version_column_name, | ||
): | ||
self.connector.prepare_column( | ||
self.full_table_name, | ||
self.version_column_name, | ||
sql_type=integer_type, | ||
) | ||
|
||
self.logger.info("Hard delete: %s", self.config.get("hard_delete")) | ||
if self.config["hard_delete"] is True: | ||
self.connection.execute( | ||
f"DELETE FROM {self.full_table_name} " | ||
visch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f"WHERE {self.version_column_name} <= {new_version} " | ||
f"OR {self.version_column_name} IS NULL" | ||
) | ||
return | ||
|
||
if not self.connector.column_exists( | ||
full_table_name=self.full_table_name, | ||
column_name=self.soft_delete_column_name, | ||
): | ||
self.connector.prepare_column( | ||
self.full_table_name, | ||
self.soft_delete_column_name, | ||
sql_type=datetime_type, | ||
) | ||
# Need to deal with the case where data doesn't exist for the version column | ||
query = sqlalchemy.text( | ||
f"UPDATE {self.full_table_name}\n" | ||
f"SET {self.soft_delete_column_name} = :deletedate \n" | ||
f"WHERE {self.version_column_name} < :version " | ||
f"OR {self.version_column_name} IS NULL \n" | ||
f" AND {self.soft_delete_column_name} IS NULL\n" | ||
) | ||
query = query.bindparams( | ||
bindparam("deletedate", value=deleted_at, type_=datetime_type), | ||
bindparam("version", value=new_version, type_=integer_type), | ||
) | ||
self.connector.connection.execute(query) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{"type": "SCHEMA", "stream": "test_activate_version_hard", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} | ||
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431564} |
Uh oh!
There was an error while loading. Please reload this page.