Skip to content

Conversation

@neuromantik33
Copy link

@neuromantik33 neuromantik33 commented Dec 23, 2024

Differences between pg_legacy_replication and pg_replication

Overview

pg_legacy_replication is a fork of the verified pg_replication source. The primary goal of this fork is to provide logical replication capabilities for Postgres instances running versions earlier than 10, when the pgoutput plugin was not yet available. This fork draws inspiration from the original pg_replication source and the decoderbufs library, which is actively maintained by Debezium.

Key Differences from pg_replication

Replication User Ownership Requirements

One of the limitations of native Postgre replication is that the replication user must own the tables in order to add them to a publication.
Additionally, once a table is added to a publication, it cannot be removed, requiring the creation of a new replication slot, which results in the loss of any state tracking.

Limitations in pg_replication

The current pg_replication implementation has several limitations:

  • It supports only a single initial snapshot of the data.
  • It requires CREATE access to the source database in order to perform the initial snapshot.
  • Superuser access is required to replicate entire Postgres schemas.
    While the pg_legacy_replication source theoretically reads the entire WAL across all schemas, the current implementation using dlt transformers restricts this functionality. In practice, this has not been a common use case.
  • The implementation is opinionated in its approach to data transfer. Specifically, when updates or deletes are required, it defaults to a merge write disposition, which replicates live data without tracking changes over time.

Features of pg_legacy_replication

This fork of pg_replication addresses the aforementioned limitations and introduces the following improvements:

  • Adheres to the dlt philosophy by treating the WAL as an upstream resource. This replication stream is then transformed into various DLT resources, with customizable options for write disposition, file formats, type hints, etc., specified at the resource level rather than at the source level.
  • Supports an initial snapshot of all tables using the transaction slot isolation level. Additionally, ad-hoc snapshots can be performed using the serializable deferred isolation level, similar to pg_backup.
  • Emphasizes the use of pyarrow and parquet formats for efficient data storage and transfer. A dedicated backend has been implemented to support these formats.
  • Replication messages are decoded using Protocol Buffers (protobufs) in C, rather than relying on native Python byte buffer parsing. This ensures greater efficiency and performance.

Next steps

  • Add support for the wal2json replication plugin. This is particularly important for environments such as Amazon RDS, which supports wal2json, as opposed to on-premise or Google Cloud SQL instances that support decoderbufs.

@rudolfix rudolfix self-assigned this Dec 25, 2024
@rudolfix rudolfix added the ci from fork Allows to run tests from PR coming from fork label Dec 25, 2024
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is soo good! thanks for docs and test cases. also pyarrow support is amazing.

I have just a few comments to hacky cleanups you do. but I think nothing really serious.

"""Creates a dispatch handler that processes data items based on a specified table and optional column hints."""
handler = BackendHandler(table, repl_options)
# FIXME Uhhh.. why do I have to do this?
handler.__qualname__ = "BackendHandler.__call__" # type: ignore[attr-defined]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! why? is dlt checking this somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have no idea why this is required here is the output with the line commented

~/repos/github/neuromantik33/verified-sources $ poetry run pytest tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality 
============================= test session starts ==============================
platform linux -- Python 3.11.11, pytest-7.4.2, pluggy-1.3.0 -- /home/drnick/repos/github/neuromantik33/verified-sources/.venv/bin/python
cachedir: .pytest_cache
rootdir: /home/drnick/repos/github/neuromantik33/verified-sources
configfile: pytest.ini
plugins: mock-3.12.0, requests-mock-1.11.0, cov-5.0.0, forked-1.6.0, anyio-4.0.0
collecting ... collected 2 items

tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb] FAILED [ 50%]
tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb] FAILED [100%]

=================================== FAILURES ===================================
__________________ test_core_functionality[sqlalchemy-duckdb] __________________

src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>, 'test_slot_a72aac18')
destination_name = 'duckdb', backend = 'sqlalchemy'

    @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
    @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow"])
    def test_core_functionality(
        src_config: Tuple[dlt.Pipeline, str], destination_name: str, backend: TableBackend
    ) -> None:
        @dlt.resource(write_disposition="merge", primary_key="id_x")
        def tbl_x(data):
            yield data
    
        @dlt.resource(write_disposition="merge", primary_key="id_y")
        def tbl_y(data):
            yield data
    
        src_pl, slot_name = src_config
    
        src_pl.run(
            [
                tbl_x({"id_x": 1, "val_x": "foo"}),
                tbl_y({"id_y": 1, "val_y": True}),
            ]
        )
        add_pk(src_pl.sql_client, "tbl_x", "id_x")
        add_pk(src_pl.sql_client, "tbl_y", "id_y")
    
        snapshots = init_replication(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            take_snapshots=True,
            table_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )
    
>       changes = replication_source(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            repl_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )

backend    = 'sqlalchemy'
destination_name = 'duckdb'
slot_name  = 'test_slot_a72aac18'
snapshots  = <dlt.extract.source.DltSource object at 0x7f96ceb9fd50>
src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>, 'test_slot_a72aac18')
src_pl     = <dlt.pipeline.pipeline.Pipeline object at 0x7f96cf51f3d0>
tbl_x      = <dlt.extract.resource.DltResource object at 0x7f96d8ba9d50>
tbl_y      = <dlt.extract.resource.DltResource object at 0x7f96da95bdd0>

tests/pg_legacy_replication/test_pg_replication.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:195: in __call__
    source = self._deco_f(*args, **kwargs)
        args       = ()
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}}, 'schema': 'src_pl_dataset_202501221235482995', 'slot_name': 'test_slot_a72aac18', 'table_names': ('tbl_x', 'tbl_y')}
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:280: in _wrap
    return _eval_rv(rv, schema_copy)
        _eval_rv   = <function DltSourceFactoryWrapper._wrap.<locals>._eval_rv at 0x7f96d773fba0>
        _make_schema = <function DltSourceFactoryWrapper._wrap.<locals>._make_schema at 0x7f96cf55c540>
        args       = ()
        conf_f     = <function replication_source at 0x7f96cf55c4a0>
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}}, 'schema': 'src_pl_dataset_202501221235482995', 'slot_name': 'test_slot_a72aac18', 'table_names': ('tbl_x', 'tbl_y')}
        pipeline_name = 'src_pl'
        proxy      = <dlt.common.pipeline.PipelineContext object at 0x7f96ceb05d10>
        rv         = <generator object replication_source at 0x7f96cd7182c0>
        schema_copy = Schema replication_source at 140285669523216
        source_sections = ('sources', 'pg_legacy_replication', 'replication_source')
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:240: in _eval_rv
    _rv = list(_rv)
        _rv        = <generator object replication_source at 0x7f96cd7182c0>
        schema_copy = Schema replication_source at 140285669523216
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
        source_section = 'pg_legacy_replication'
sources/pg_legacy_replication/__init__.py:118: in replication_source
    yield dlt.transformer(
        credentials = <dlt.common.configuration.specs.connection_string_credentials.ConnectionStringCredentials object at 0x7f96cd7ffc50>
        flush_slot = True
        repl_options = defaultdict(<function replication_source.<locals>.<lambda> at 0x7f96cd740180>, {'tbl_x': {'backend': 'sqlalchemy'}, 'tbl_y': {'backend': 'sqlalchemy'}})
        replication_resource = <function replication_source.<locals>.replication_resource at 0x7f96cd7ea700>
        schema     = 'src_pl_dataset_202501221235482995'
        slot_name  = 'test_slot_a72aac18'
        table      = 'tbl_x'
        table_names = ('tbl_x', 'tbl_y')
        target_batch_size = 1000
        wal_reader = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:958: in transformer
    return resource(  # type: ignore
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
        f          = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
        file_format = None
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        write_disposition = None
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:758: in resource
    return decorator(data)
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data       = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
        decorator  = <function resource.<locals>.decorator at 0x7f96cd7ea980>
        file_format = None
        make_resource = <function resource.<locals>.make_resource at 0x7f96cd7ea840>
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        references = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cd7ea8e0>
        write_disposition = None
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

f = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})

    def decorator(
        f: Callable[TResourceFunParams, Any]
    ) -> Callable[TResourceFunParams, TDltResourceImpl]:
        if not callable(f):
            if data_from:
                # raise more descriptive exception if we construct transformer
                raise InvalidTransformerDataTypeGeneratorFunctionRequired(
                    name or "<no name>", f, type(f)
                )
            raise ResourceFunctionExpected(name or "<no name>", f, type(f))
        if not standalone and callable(name):
            raise DynamicNameNotStandaloneResource(get_callable_name(f))
    
        resource_name = name if name and not callable(name) else get_callable_name(f)
    
        func_module = inspect.getmodule(f)
        source_section = _get_source_section_name(func_module)
        is_inner_resource = is_inner_callable(f)
    
        if spec is None:
            # autodetect spec
            SPEC, resolvable_fields = spec_from_signature(
                f, inspect.signature(f), include_defaults=standalone
            )
            if is_inner_resource and not standalone:
                if len(resolvable_fields) > 0:
                    # prevent required arguments to inner functions that are not standalone
                    raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section)
                else:
                    # empty spec for inner functions - they should not be injected
                    SPEC = BaseConfiguration
        else:
            SPEC = spec
        # assign spec to "f"
        set_fun_spec(f, SPEC)
    
        # register non inner resources as source with single resource in it
        if not is_inner_resource:
            # a source function for the source wrapper, args that go to source are forwarded
            # to a single resource within
            def _source(
                name_ovr: str, section_ovr: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]
            ) -> TDltResourceImpl:
                return wrap_standalone(name_ovr or resource_name, section_ovr or source_section, f)(
                    *args, **kwargs
                )
    
            # make the source module same as original resource
>           _source.__qualname__ = f.__qualname__
E           AttributeError: 'BackendHandler' object has no attribute '__qualname__'

SPEC       = <class 'sources.pg_legacy_replication.helpers.BackendHandlerConfiguration'>
_source    = <function resource.<locals>.decorator.<locals>._source at 0x7f96cd7eaa20>
data_from  = <dlt.extract.resource.DltResource object at 0x7f96ceba2050>
f          = BackendHandler(table='tbl_x', repl_options={'backend': 'sqlalchemy'})
func_module = <module 'sources.pg_legacy_replication.helpers' from '/home/drnick/repos/github/neuromantik33/verified-sources/sources/pg_legacy_replication/helpers.py'>
is_inner_resource = False
name       = 'tbl_x'
resolvable_fields = {}
resource_name = 'tbl_x'
source_section = 'helpers'
spec       = None
standalone = False
wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cd7ea8e0>

.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:731: AttributeError
--------------------------- Captured stdout teardown ---------------------------
schema "src_pl_dataset_202501221235482995" does not exist

schema "src_pl_dataset_202501221235482995_staging" does not exist

___________________ test_core_functionality[pyarrow-duckdb] ____________________

src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>, 'test_slot_ae9b491f')
destination_name = 'duckdb', backend = 'pyarrow'

    @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
    @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow"])
    def test_core_functionality(
        src_config: Tuple[dlt.Pipeline, str], destination_name: str, backend: TableBackend
    ) -> None:
        @dlt.resource(write_disposition="merge", primary_key="id_x")
        def tbl_x(data):
            yield data
    
        @dlt.resource(write_disposition="merge", primary_key="id_y")
        def tbl_y(data):
            yield data
    
        src_pl, slot_name = src_config
    
        src_pl.run(
            [
                tbl_x({"id_x": 1, "val_x": "foo"}),
                tbl_y({"id_y": 1, "val_y": True}),
            ]
        )
        add_pk(src_pl.sql_client, "tbl_x", "id_x")
        add_pk(src_pl.sql_client, "tbl_y", "id_y")
    
        snapshots = init_replication(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            take_snapshots=True,
            table_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )
    
>       changes = replication_source(
            slot_name=slot_name,
            schema=src_pl.dataset_name,
            table_names=("tbl_x", "tbl_y"),
            repl_options={
                "tbl_x": {"backend": backend},
                "tbl_y": {"backend": backend},
            },
        )

backend    = 'pyarrow'
destination_name = 'duckdb'
slot_name  = 'test_slot_ae9b491f'
snapshots  = <dlt.extract.source.DltSource object at 0x7f96cb9de390>
src_config = (<dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>, 'test_slot_ae9b491f')
src_pl     = <dlt.pipeline.pipeline.Pipeline object at 0x7f96ceb20b10>
tbl_x      = <dlt.extract.resource.DltResource object at 0x7f96ccd0c610>
tbl_y      = <dlt.extract.resource.DltResource object at 0x7f96ccd0e0d0>

tests/pg_legacy_replication/test_pg_replication.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:195: in __call__
    source = self._deco_f(*args, **kwargs)
        args       = ()
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}}, 'schema': 'src_pl_dataset_202501221235515171', 'slot_name': 'test_slot_ae9b491f', 'table_names': ('tbl_x', 'tbl_y')}
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:280: in _wrap
    return _eval_rv(rv, schema_copy)
        _eval_rv   = <function DltSourceFactoryWrapper._wrap.<locals>._eval_rv at 0x7f96d773fba0>
        _make_schema = <function DltSourceFactoryWrapper._wrap.<locals>._make_schema at 0x7f96cf55c540>
        args       = ()
        conf_f     = <function replication_source at 0x7f96cf55c4a0>
        kwargs     = {'repl_options': {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}}, 'schema': 'src_pl_dataset_202501221235515171', 'slot_name': 'test_slot_ae9b491f', 'table_names': ('tbl_x', 'tbl_y')}
        pipeline_name = 'src_pl'
        proxy      = <dlt.common.pipeline.PipelineContext object at 0x7f96ceb05d10>
        rv         = <generator object replication_source at 0x7f971204dd00>
        schema_copy = Schema replication_source at 140285658134864
        source_sections = ('sources', 'pg_legacy_replication', 'replication_source')
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:240: in _eval_rv
    _rv = list(_rv)
        _rv        = <generator object replication_source at 0x7f971204dd00>
        schema_copy = Schema replication_source at 140285658134864
        self       = <dlt.extract.decorators.DltSourceFactoryWrapper object at 0x7f96d7768d90>
        source_section = 'pg_legacy_replication'
sources/pg_legacy_replication/__init__.py:118: in replication_source
    yield dlt.transformer(
        credentials = <dlt.common.configuration.specs.connection_string_credentials.ConnectionStringCredentials object at 0x7f96cb96a0d0>
        flush_slot = True
        repl_options = defaultdict(<function replication_source.<locals>.<lambda> at 0x7f96cc3dbba0>, {'tbl_x': {'backend': 'pyarrow'}, 'tbl_y': {'backend': 'pyarrow'}})
        replication_resource = <function replication_source.<locals>.replication_resource at 0x7f96cc3d85e0>
        schema     = 'src_pl_dataset_202501221235515171'
        slot_name  = 'test_slot_ae9b491f'
        table      = 'tbl_x'
        table_names = ('tbl_x', 'tbl_y')
        target_batch_size = 1000
        wal_reader = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:958: in transformer
    return resource(  # type: ignore
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
        f          = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
        file_format = None
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        write_disposition = None
.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:758: in resource
    return decorator(data)
        _impl_cls  = <class 'dlt.extract.resource.DltResource'>
        columns    = None
        data       = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
        data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
        decorator  = <function resource.<locals>.decorator at 0x7f96cafa7740>
        file_format = None
        make_resource = <function resource.<locals>.make_resource at 0x7f96cc3d9d00>
        max_table_nesting = None
        merge_key  = None
        name       = 'tbl_x'
        parallelized = False
        primary_key = None
        references = None
        schema_contract = None
        selected   = True
        spec       = None
        standalone = False
        table_format = None
        table_name = None
        wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cafa6840>
        write_disposition = None
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

f = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})

    def decorator(
        f: Callable[TResourceFunParams, Any]
    ) -> Callable[TResourceFunParams, TDltResourceImpl]:
        if not callable(f):
            if data_from:
                # raise more descriptive exception if we construct transformer
                raise InvalidTransformerDataTypeGeneratorFunctionRequired(
                    name or "<no name>", f, type(f)
                )
            raise ResourceFunctionExpected(name or "<no name>", f, type(f))
        if not standalone and callable(name):
            raise DynamicNameNotStandaloneResource(get_callable_name(f))
    
        resource_name = name if name and not callable(name) else get_callable_name(f)
    
        func_module = inspect.getmodule(f)
        source_section = _get_source_section_name(func_module)
        is_inner_resource = is_inner_callable(f)
    
        if spec is None:
            # autodetect spec
            SPEC, resolvable_fields = spec_from_signature(
                f, inspect.signature(f), include_defaults=standalone
            )
            if is_inner_resource and not standalone:
                if len(resolvable_fields) > 0:
                    # prevent required arguments to inner functions that are not standalone
                    raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section)
                else:
                    # empty spec for inner functions - they should not be injected
                    SPEC = BaseConfiguration
        else:
            SPEC = spec
        # assign spec to "f"
        set_fun_spec(f, SPEC)
    
        # register non inner resources as source with single resource in it
        if not is_inner_resource:
            # a source function for the source wrapper, args that go to source are forwarded
            # to a single resource within
            def _source(
                name_ovr: str, section_ovr: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]
            ) -> TDltResourceImpl:
                return wrap_standalone(name_ovr or resource_name, section_ovr or source_section, f)(
                    *args, **kwargs
                )
    
            # make the source module same as original resource
>           _source.__qualname__ = f.__qualname__
E           AttributeError: 'BackendHandler' object has no attribute '__qualname__'

SPEC       = <class 'sources.pg_legacy_replication.helpers.BackendHandlerConfiguration'>
_source    = <function resource.<locals>.decorator.<locals>._source at 0x7f96cafa76a0>
data_from  = <dlt.extract.resource.DltResource object at 0x7f96caff30d0>
f          = BackendHandler(table='tbl_x', repl_options={'backend': 'pyarrow'})
func_module = <module 'sources.pg_legacy_replication.helpers' from '/home/drnick/repos/github/neuromantik33/verified-sources/sources/pg_legacy_replication/helpers.py'>
is_inner_resource = False
name       = 'tbl_x'
resolvable_fields = {}
resource_name = 'tbl_x'
source_section = 'helpers'
spec       = None
standalone = False
wrap_standalone = <function resource.<locals>.wrap_standalone at 0x7f96cafa6840>

.venv/lib/python3.11/site-packages/dlt/extract/decorators.py:731: AttributeError
----------------------------- Captured stderr call -----------------------------
2025-01-22 13:35:54,498|[WARNING]|38699|140286848428928|dlt|source.py|register:572|A source with ref dlt.helpers.tbl_x is already registered and will be overwritten
2025-01-22 13:35:54,513|[WARNING]|38699|140286848428928|dlt|source.py|register:572|A source with ref dlt.helpers.tbl_y is already registered and will be overwritten
--------------------------- Captured stdout teardown ---------------------------
schema "src_pl_dataset_202501221235515171" does not exist

schema "src_pl_dataset_202501221235515171_staging" does not exist

=============================== warnings summary ===============================
.venv/lib/python3.11/site-packages/dlt/helpers/dbt/__init__.py:3
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/helpers/dbt/__init__.py:3: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
    import pkg_resources

.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: 19 warnings
tests/pg_legacy_replication/test_pg_replication.py: 1958 warnings
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: DeprecationWarning: currentThread() is deprecated, use current_thread() instead
    if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):

.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: 19 warnings
tests/pg_legacy_replication/test_pg_replication.py: 1958 warnings
  /home/drnick/repos/github/neuromantik33/verified-sources/.venv/lib/python3.11/site-packages/dlt/common/configuration/container.py:95: DeprecationWarning: getName() is deprecated, get the name attribute instead
    if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
============================= slowest 10 durations =============================
2.99s call     tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
2.89s call     tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.17s teardown tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
0.17s teardown tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.07s setup    tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
0.04s setup    tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
=========================== short test summary info ============================
FAILED tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[sqlalchemy-duckdb]
FAILED tests/pg_legacy_replication/test_pg_replication.py::test_core_functionality[pyarrow-duckdb]
======================= 2 failed, 3955 warnings in 6.67s =======================

return engine


def cleanup_snapshot_resources(snapshots: DltSource) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used by tests only, right? or is user supposed to do it? if tests only I'd move it to tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if you're running dlt in a long lived python executable, it's best to close the engine and drop the rep_conn connection. However since I use it in short lived k8s or celery jobs, it's not really necessary. I'm not sure though that it's optional for everyone.

@@ -0,0 +1,130 @@
# Postgres legacy replication
[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes
in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the optional `decoderbufs`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean that decoderbufs is optional? if not present, we decode on the client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heavy lifting is done by the decoderbufs extension, which must be added if using a managed postgres like Cloud SQL, or compiled and installed on a self hosted postgres installation.

Detailed instructions can be found here : https://debezium.io/documentation/reference/stable/postgres-plugins.html#logical-decoding-output-plugin-installation

FYI decoderbufs is the default logical replication plugin used by Debezium.

@rudolfix rudolfix added ci from fork Allows to run tests from PR coming from fork and removed ci from fork Allows to run tests from PR coming from fork labels Jan 20, 2025
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two more things:

  1. in the meantime Python 3.8 is EOL and newest poetry does not support it. could you change python 3.,9 in our lint/init CI workflow to 3.9?
  2. maybe we can rename this to pg_wal_replication? it does not look like legacy to me at all :)

type_adapter_callback: Optional[TTypeAdapter]


@dlt.sources.config.with_config(sections=("sources", "pg_legacy_replication"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work this way. you need to pass section="pg_legacy_replication" into dlt.source or just move this function to init.py (which I think is the best option). then both dlt.source share the same configuration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Makes sense 👍

Copy link
Author

@neuromantik33 neuromantik33 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about changing the name to pg_wal_replication. As you've mentioned, pgoutput is now the de-facto way of doing logical replication—being built-in to postgres— and despite its shortcomings most users might not want to add legacy LR plugins. But I really don't care about the name, it's really a means to end for me (getting a replication source to work in pg9.6.)

Nicolas ESTRADA added 6 commits January 29, 2025 00:23
- No longer inferring schemas if the first row_msg in a batch is a DELETE operation
- Instead uses sqlalchemy to reflect the schema
- In pg9.6, pg_current_xlog_location wasn't reliable which would cause the message consumer to hang until new data was flushed to WAL
- Doesn't fix but was the cause for dlt-hub/dlt#2229 (was able to reproduce in the added test case)
- Some minor refactoring
- refactor: changing closing semantics for db conns (using contextlib.closing)
- fix: it is necessary sometimes to use the same reflection level as with the initial snapshot for arrow schemas
- fix: timezone flag is now an acceptable seemless schema migration
- fix: aligned precision for fixed integer types to match the ones inferred from the sql_database source (I guess to account for signed values)
- chore: removed test case with changing the precision of a byte array with pyarrow (absurd one to begin with and no longer possible with the new rows_to_arrow implementation)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci from fork Allows to run tests from PR coming from fork

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants