1
1
"""Postgres target sink class, which handles writing streams."""
2
2
3
+ from __future__ import annotations
4
+
3
5
import datetime
6
+ import typing as t
4
7
import uuid
5
- from typing import Any , Dict , Iterable , List , Optional , Sequence , Union , cast
6
8
7
9
import sqlalchemy as sa
8
10
from singer_sdk .sinks import SQLSink
@@ -39,7 +41,7 @@ def connector(self) -> PostgresConnector:
39
41
Returns:
40
42
The connector object.
41
43
"""
42
- return cast (PostgresConnector , self ._connector )
44
+ return t . cast (PostgresConnector , self ._connector )
43
45
44
46
def setup (self ) -> None :
45
47
"""Set up Sink.
@@ -120,10 +122,10 @@ def bulk_insert_records( # type: ignore[override]
120
122
self ,
121
123
table : sa .Table ,
122
124
schema : dict ,
123
- records : Iterable [Dict [str , Any ]],
124
- primary_keys : Sequence [str ],
125
+ records : t . Iterable [dict [str , t . Any ]],
126
+ primary_keys : t . Sequence [str ],
125
127
connection : sa .engine .Connection ,
126
- ) -> Optional [ int ] :
128
+ ) -> int | None :
127
129
"""Bulk insert records to an existing destination table.
128
130
129
131
The default implementation uses a generic SQLAlchemy bulk insert operation.
@@ -142,7 +144,7 @@ def bulk_insert_records( # type: ignore[override]
142
144
True if table exists, False if not, None if unsure or undetectable.
143
145
"""
144
146
columns = self .column_representation (schema )
145
- insert : str = cast (
147
+ insert : str = t . cast (
146
148
str ,
147
149
self .generate_insert_statement (
148
150
table .name ,
@@ -151,10 +153,10 @@ def bulk_insert_records( # type: ignore[override]
151
153
)
152
154
self .logger .info ("Inserting with SQL: %s" , insert )
153
155
# Only one record per PK, we want to take the last one
154
- data_to_insert : List [ Dict [str , Any ]] = []
156
+ data_to_insert : list [ dict [str , t . Any ]] = []
155
157
156
158
if self .append_only is False :
157
- insert_records : Dict [str , Dict ] = {} # pk : record
159
+ insert_records : dict [str , dict ] = {} # pk : record
158
160
for record in records :
159
161
insert_record = {}
160
162
for column in columns :
@@ -178,9 +180,9 @@ def upsert(
178
180
from_table : sa .Table ,
179
181
to_table : sa .Table ,
180
182
schema : dict ,
181
- join_keys : Sequence [str ],
183
+ join_keys : t . Sequence [str ],
182
184
connection : sa .engine .Connection ,
183
- ) -> Optional [ int ] :
185
+ ) -> int | None :
184
186
"""Merge upsert data from one table to another.
185
187
186
188
Args:
@@ -247,7 +249,7 @@ def upsert(
247
249
def column_representation (
248
250
self ,
249
251
schema : dict ,
250
- ) -> List [sa .Column ]:
252
+ ) -> list [sa .Column ]:
251
253
"""Return a sqlalchemy table representation for the current schema."""
252
254
columns : list [sa .Column ] = []
253
255
for property_name , property_jsonschema in schema ["properties" ].items ():
@@ -262,8 +264,8 @@ def column_representation(
262
264
def generate_insert_statement (
263
265
self ,
264
266
full_table_name : str ,
265
- columns : List [sa .Column ], # type: ignore[override]
266
- ) -> Union [ str , Executable ] :
267
+ columns : list [sa .Column ], # type: ignore[override]
268
+ ) -> str | Executable :
267
269
"""Generate an insert statement for the given records.
268
270
269
271
Args:
@@ -277,12 +279,12 @@ def generate_insert_statement(
277
279
table = sa .Table (full_table_name , metadata , * columns )
278
280
return sa .insert (table )
279
281
280
- def conform_name (self , name : str , object_type : Optional [ str ] = None ) -> str :
282
+ def conform_name (self , name : str , object_type : str | None = None ) -> str :
281
283
"""Conforming names of tables, schemas, column names."""
282
284
return name
283
285
284
286
@property
285
- def schema_name (self ) -> Optional [ str ] :
287
+ def schema_name (self ) -> str | None :
286
288
"""Return the schema name or `None` if using names with no schema part.
287
289
288
290
Note that after the next SDK release (after 0.14.0) we can remove this
0 commit comments