Skip to content

Commit ba7c780

Browse files
committed
I/O: Adapter for PostgreSQL full-load using ingestr
1 parent 1d9fefb commit ba7c780

File tree

17 files changed

+474
-7
lines changed

17 files changed

+474
-7
lines changed

.dlt/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[runtime]
2+
3+
dlthub_telemetry=false

.github/workflows/postgresql.yml

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
---
2+
name: "Tests: PostgreSQL"
3+
4+
on:
5+
pull_request:
6+
paths:
7+
- '.github/workflows/postgresql.yml'
8+
- 'cratedb_toolkit/io/ingestr/**'
9+
- 'tests/io/ingestr/*postgresql*'
10+
- 'pyproject.toml'
11+
push:
12+
branches: [ main ]
13+
paths:
14+
- '.github/workflows/postgresql.yml'
15+
- 'cratedb_toolkit/io/ingestr/**'
16+
- 'tests/io/ingestr/*postgresql*'
17+
- 'pyproject.toml'
18+
19+
# Allow job to be triggered manually.
20+
workflow_dispatch:
21+
22+
# Run the job each night after CrateDB nightly has been published.
23+
schedule:
24+
- cron: '0 3 * * *'
25+
26+
# Cancel in-progress jobs when pushing to the same branch.
27+
concurrency:
28+
cancel-in-progress: true
29+
group: ${{ github.workflow }}-${{ github.ref }}
30+
31+
jobs:
32+
33+
tests:
34+
35+
runs-on: ${{ matrix.os }}
36+
strategy:
37+
fail-fast: false
38+
matrix:
39+
os: ["ubuntu-latest"]
40+
cratedb-version: [
41+
"nightly",
42+
]
43+
postgresql-version: [
44+
"latest",
45+
]
46+
python-version: [
47+
"3.10",
48+
"3.13",
49+
]
50+
51+
env:
52+
OS: ${{ matrix.os }}
53+
PYTHON: ${{ matrix.python-version }}
54+
CRATEDB_VERSION: ${{ matrix.cratedb-version }}
55+
POSTGRESQL_VERSION: ${{ matrix.postgresql-version }}
56+
UV_SYSTEM_PYTHON: true
57+
TC_KEEPALIVE: true # Do not tear down Testcontainers
58+
INGESTR_DISABLE_TELEMETRY: true
59+
60+
services:
61+
cratedb:
62+
image: crate/crate:${{ matrix.cratedb-version }}
63+
ports:
64+
- 4200:4200
65+
- 5432:5432
66+
env:
67+
CRATE_HEAP_SIZE: 4g
68+
postgresql:
69+
image: postgres:${{ matrix.postgresql-version }}
70+
ports:
71+
- 5433:5432
72+
env:
73+
POSTGRES_HOST_AUTH_METHOD: trust
74+
75+
name: "
76+
Python ${{ matrix.python-version }},
77+
PostgreSQL ${{ matrix.postgresql-version }},
78+
OS ${{ matrix.os }}
79+
"
80+
steps:
81+
82+
- name: Acquire sources
83+
uses: actions/checkout@v4
84+
85+
- name: Install Python
86+
uses: actions/setup-python@v5
87+
with:
88+
python-version: ${{ matrix.python-version }}
89+
90+
- name: Install uv
91+
uses: astral-sh/setup-uv@v5
92+
with:
93+
cache-dependency-glob: |
94+
pyproject.toml
95+
cache-suffix: ${{ matrix.python-version }}
96+
enable-cache: true
97+
version: "latest"
98+
99+
- name: Set up project
100+
run: |
101+
102+
# Install package in editable mode.
103+
uv pip install --editable='.[io-ingestr,test,develop]'
104+
105+
- name: Run linter and software tests
106+
run: |
107+
pytest -m postgresql
108+
109+
- name: Upload coverage to Codecov
110+
uses: codecov/codecov-action@v5
111+
env:
112+
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
113+
with:
114+
files: ./coverage.xml
115+
flags: postgresql
116+
env_vars: OS,PYTHON
117+
name: codecov-umbrella
118+
fail_ci_if_error: true

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- CDC: Enabled loading DMS events from Kinesis streams and stream-dump files
1717
- CDC: Added subcommand `ctk dms table-mappings`
1818
- Dependencies: Updated to `commons-codec>=0.0.23`
19+
- I/O: Adapter for PostgreSQL full-load using ingestr
1920

2021
## 2025/05/13 v0.0.35
2122
- Added lost `pytest` dependencies to `cratedb-toolkit[testing]`

cratedb_toolkit/cluster/core.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23+
from cratedb_toolkit.io.ingestr.api import ingestr_copy, ingestr_select
2324
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2425
from cratedb_toolkit.util.client import jwt_token_patch
2526
from cratedb_toolkit.util.data import asbool
@@ -537,10 +538,16 @@ def load_table(
537538
"""
538539
source_url = source.url
539540
target_url = self.address.dburi
540-
541541
source_url_obj = URL(source.url)
542542

543-
if source_url_obj.scheme.startswith("dynamodb"):
543+
if ingestr_select(source_url):
544+
if ingestr_copy(source_url, self.address, progress=True):
545+
self._load_table_result = True
546+
else:
547+
logger.error("Data loading failed or incomplete")
548+
self._load_table_result = False
549+
550+
elif source_url_obj.scheme.startswith("dynamodb"):
544551
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy
545552

546553
if dynamodb_copy(str(source_url_obj), target_url, progress=True):

cratedb_toolkit/io/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class BulkProcessor:
103103
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
104104
"""
105105

106-
connection: sa.Connection
106+
connection: sa.engine.Connection
107107
data: t.Iterable[t.List[t.Dict[str, t.Any]]]
108108
batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation]
109109
progress_bar: t.Union[tqdm, None] = None

cratedb_toolkit/io/ingestr/__init__.py

Whitespace-only changes.

cratedb_toolkit/io/ingestr/api.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import logging
2+
3+
from yarl import URL
4+
5+
from cratedb_toolkit.io.ingestr.boot import import_ingestr
6+
from cratedb_toolkit.model import DatabaseAddress
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
ingestr_available, ingestr, ConfigFieldMissingException = import_ingestr()
12+
13+
14+
def ingestr_select(source_url: str) -> bool:
15+
"""
16+
Whether to select `ingestr` for this data source.
17+
"""
18+
if not ingestr_available:
19+
return False
20+
try:
21+
factory = ingestr.src.factory.SourceDestinationFactory(source_url, "csv:////tmp/foobar.csv")
22+
factory.get_source()
23+
scheme = ingestr.src.factory.parse_scheme_from_uri(source_url)
24+
logger.info(f"Selecting ingestr for source scheme: {scheme}")
25+
return True
26+
except (ImportError, ValueError, AttributeError) as ex:
27+
if "Unsupported source scheme" in str(ex):
28+
logger.debug(f"Failed to select ingestr for source url '{source_url}': {ex}")
29+
else:
30+
logger.exception(f"Unexpected error with ingestr for source url: {source_url}")
31+
return False
32+
except Exception:
33+
logger.exception(f"Unexpected error with ingestr for source url: {source_url}")
34+
return False
35+
36+
37+
def ingestr_copy(source_url: str, target_address: DatabaseAddress, progress: bool = False):
38+
"""
39+
Invoke data transfer to CrateDB from any source provided by `ingestr`.
40+
41+
https://cratedb-toolkit.readthedocs.io/io/ingestr/
42+
43+
Synopsis:
44+
45+
ctk load table \
46+
"frankfurter://?base=EUR&table=latest" \
47+
--cluster-url="crate://crate:na@localhost:4200/testdrive/exchange_latest"
48+
49+
ctk load table \
50+
"frankfurter://?base=EUR&table=currencies" \
51+
--cluster-url="crate://crate:na@localhost:4200/testdrive/exchange_currencies"
52+
53+
ctk load table \
54+
"postgresql://pguser:[email protected]:5432/postgres?table=public.diamonds" \
55+
--cluster-url="crate://crate:na@localhost:4200/testdrive/ibis_diamonds"
56+
"""
57+
58+
# Sanity checks.
59+
if not ingestr_available:
60+
raise ModuleNotFoundError("ingestr subsystem not installed")
61+
62+
# Compute source and target URLs and table names.
63+
# Table names use dotted notation `<schema>.<table>`.
64+
65+
source_url_obj = URL(source_url)
66+
source_table = source_url_obj.query.get("table")
67+
source_fragment = source_url_obj.fragment
68+
source_url_obj = source_url_obj.without_query_params("table").with_fragment("")
69+
70+
target_uri, target_table_address = target_address.decode()
71+
target_table = target_table_address.fullname
72+
target_url = target_address.to_ingestr_url()
73+
74+
if not source_table:
75+
raise ValueError("Source table is required")
76+
if not target_table:
77+
target_table = source_table
78+
79+
if source_fragment:
80+
source_table += f"#{source_fragment}"
81+
82+
logger.info("Invoking ingestr")
83+
logger.info(f"Source URL: {source_url_obj}")
84+
logger.info(f"Target URL: {target_url}")
85+
logger.info(f"Source Table: {source_table}")
86+
logger.info(f"Target Table: {target_table}")
87+
88+
try:
89+
ingestr.main.ingest(
90+
source_uri=str(source_url_obj),
91+
dest_uri=str(target_url),
92+
source_table=source_table,
93+
dest_table=target_table,
94+
yes=True,
95+
)
96+
return True
97+
except ConfigFieldMissingException:
98+
logger.error(
99+
"A configuration field is missing. Please ensure all required credentials are provided. "
100+
"For example, if your account does not use a password, use a dummy password `na` like "
101+
"`export CRATEDB_CLUSTER_URL=crate://crate:na@localhost:4200/testdrive`"
102+
)
103+
raise
104+
except Exception as ex:
105+
logger.exception(f"Failed to ingest data: {ex}")
106+
return False

cratedb_toolkit/io/ingestr/boot.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import logging
2+
from unittest import mock
3+
4+
from boltons.urlutils import URL
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def import_ingestr():
10+
"""Import ingestr with CrateDB destination adapter."""
11+
try:
12+
with mock.patch("ingestr.src.telemetry.event.track"), mock.patch(
13+
"dlt.common.runtime.telemetry._TELEMETRY_STARTED", True
14+
):
15+
import dlt_cratedb # noqa: F401
16+
import ingestr.main
17+
import ingestr.src.factory
18+
from dlt.common.configuration import ConfigFieldMissingException
19+
from ingestr.src.destinations import GenericSqlDestination
20+
21+
class CrateDBDestination(GenericSqlDestination):
22+
def dlt_dest(self, uri: str, **kwargs):
23+
uri = self._replace_url(uri)
24+
import dlt_cratedb.impl.cratedb.factory
25+
26+
return dlt_cratedb.impl.cratedb.factory.cratedb(credentials=uri, **kwargs)
27+
28+
@staticmethod
29+
def _replace_url(uri: str) -> str:
30+
url_obj = URL(uri)
31+
if url_obj.scheme == "cratedb":
32+
url_obj.scheme = "postgres"
33+
return str(url_obj)
34+
35+
ingestr.src.factory.SourceDestinationFactory.destinations["cratedb"] = CrateDBDestination
36+
37+
return True, ingestr, ConfigFieldMissingException
38+
except ImportError:
39+
logger.error("Could not import ingestr")
40+
return False, None, None

cratedb_toolkit/model.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,25 @@ def httpuri(self) -> str:
130130
uri.scheme = "https"
131131
return str(uri)
132132

133+
def to_postgresql_url(self, port: int = 5432) -> URL:
134+
"""
135+
Return the `postgresql://` variant of the database URI.
136+
"""
137+
uri = deepcopy(self.uri)
138+
uri.scheme = "postgresql"
139+
if not uri.host:
140+
uri.host = "localhost"
141+
uri.port = port
142+
return uri
143+
144+
def to_ingestr_url(self, port: int = 5432) -> URL:
145+
"""
146+
Return the `cratedb://` variant of the database URI, suitable for `ingestr`.
147+
"""
148+
uri = deepcopy(self.to_postgresql_url(port))
149+
uri.scheme = "cratedb"
150+
return uri
151+
133152
@property
134153
def verify_ssl(self) -> bool:
135154
return self.uri.query_params.get("sslmode", "disable") not in ["disable", "require"]

cratedb_toolkit/testing/testcontainers/cratedb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class CrateDBContainer(DockerSkippingContainer, KeepaliveContainer, DbContainer)
5454
"""
5555

5656
CRATEDB_USER = os.environ.get("CRATEDB_USER", "crate")
57-
CRATEDB_PASSWORD = os.environ.get("CRATEDB_PASSWORD", "")
57+
CRATEDB_PASSWORD = os.environ.get("CRATEDB_PASSWORD", "crate")
5858
CRATEDB_DB = os.environ.get("CRATEDB_DB", "doc")
5959
KEEPALIVE = asbool(os.environ.get("CRATEDB_KEEPALIVE", os.environ.get("TC_KEEPALIVE", False)))
6060
CMD_OPTS = {

0 commit comments

Comments
 (0)