@@ -114,6 +114,14 @@ def prepare_table( # type: ignore[override]
114
114
connection = connection ,
115
115
)
116
116
return table
117
+ # To make table reflection work properly with pgvector,
118
+ # the module needs to be imported beforehand.
119
+ try :
120
+ from pgvector .sqlalchemy import Vector # noqa: F401
121
+ except ImportError :
122
+ self .logger .debug (
123
+ "Unable to handle pgvector's `Vector` type. Please install `pgvector`."
124
+ )
117
125
meta .reflect (connection , only = [table_name ])
118
126
table = meta .tables [
119
127
full_table_name
@@ -277,6 +285,51 @@ def pick_individual_type(jsonschema_type: dict):
277
285
if "object" in jsonschema_type ["type" ]:
278
286
return JSONB ()
279
287
if "array" in jsonschema_type ["type" ]:
288
+ # Select between different kinds of `ARRAY` data types.
289
+ #
290
+ # This currently leverages an unspecified definition for the Singer SCHEMA,
291
+ # using the `additionalProperties` attribute to convey additional type
292
+ # information, agnostic of the target database.
293
+ #
294
+ # In this case, it is about telling different kinds of `ARRAY` types apart:
295
+ # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
296
+ # alternatively, it can be a "vector" kind `ARRAY` of floating point
297
+ # numbers, effectively what pgvector is storing in its `VECTOR` type.
298
+ #
299
+ # Still, `type: "vector"` is only a surrogate label here, because other
300
+ # database systems may use different types for implementing the same thing,
301
+ # and need to translate accordingly.
302
+ """
303
+ Schema override rule in `meltano.yml`:
304
+
305
+ type: "array"
306
+ items:
307
+ type: "number"
308
+ additionalProperties:
309
+ storage:
310
+ type: "vector"
311
+ dim: 4
312
+
313
+ Produced schema annotation in `catalog.json`:
314
+
315
+ {"type": "array",
316
+ "items": {"type": "number"},
317
+ "additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
318
+ """
319
+ if (
320
+ "additionalProperties" in jsonschema_type
321
+ and "storage" in jsonschema_type ["additionalProperties" ]
322
+ ):
323
+ storage_properties = jsonschema_type ["additionalProperties" ]["storage" ]
324
+ if (
325
+ "type" in storage_properties
326
+ and storage_properties ["type" ] == "vector"
327
+ ):
328
+ # On PostgreSQL/pgvector, use the corresponding type definition
329
+ # from its SQLAlchemy dialect.
330
+ from pgvector .sqlalchemy import Vector
331
+
332
+ return Vector (storage_properties ["dim" ])
280
333
return ARRAY (JSONB ())
281
334
if jsonschema_type .get ("format" ) == "date-time" :
282
335
return TIMESTAMP ()
@@ -310,6 +363,13 @@ def pick_best_sql_type(sql_type_array: list):
310
363
NOTYPE ,
311
364
]
312
365
366
+ try :
367
+ from pgvector .sqlalchemy import Vector
368
+
369
+ precedence_order .append (Vector )
370
+ except ImportError :
371
+ pass
372
+
313
373
for sql_type in precedence_order :
314
374
for obj in sql_type_array :
315
375
if isinstance (obj , sql_type ):
@@ -516,7 +576,7 @@ def _adapt_column_type( # type: ignore[override]
516
576
return
517
577
518
578
# Not the same type, generic type or compatible types
519
- # calling merge_sql_types for assistnace
579
+ # calling merge_sql_types for assistance.
520
580
compatible_sql_type = self .merge_sql_types ([current_type , sql_type ])
521
581
522
582
if str (compatible_sql_type ) == str (current_type ):
0 commit comments