Skip to content

Commit 57ed344

Browse files
feat: Interruption and termination signals in taps and targets
1 parent 0c079e2 commit 57ed344

File tree

4 files changed

+60
-2
lines changed

4 files changed

+60
-2
lines changed

samples/sample_target_csv/csv_target.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class SampleTargetCSV(Target):
1212

1313
name = "target-csv"
1414
config_jsonschema = th.PropertiesList(
15-
th.Property("target_folder", th.StringType, required=True),
15+
th.Property("target_folder", th.StringType, default="output"),
1616
th.Property("file_naming_scheme", th.StringType),
1717
).to_dict()
1818
default_sink_class = SampleCSVTargetSink

singer_sdk/plugin_base.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import abc
66
import logging
77
import os
8+
import signal
89
import sys
910
import time
1011
import typing as t
1112
from importlib import metadata
1213
from pathlib import Path, PurePath
13-
from types import MappingProxyType
14+
from types import FrameType, MappingProxyType
1415

1516
import click
1617

@@ -176,6 +177,10 @@ def __init__(
176177
# Initialization timestamp
177178
self.__initialized_at = int(time.time() * 1000)
178179

180+
# Signal handling
181+
signal.signal(signal.SIGINT, self._handle_termination)
182+
signal.signal(signal.SIGTERM, self._handle_termination)
183+
179184
def setup_mapper(self) -> None:
180185
"""Initialize the plugin mapper for this tap."""
181186
self._mapper = PluginMapper(
@@ -402,6 +407,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:
402407

403408
return errors
404409

410+
def _handle_termination( # pragma: no cover
411+
self,
412+
signum: int, # noqa: ARG002
413+
frame: FrameType | None, # noqa: ARG002
414+
) -> None:
415+
"""Handle termination signal.
416+
417+
Args:
418+
signum: Signal number.
419+
frame: Frame.
420+
421+
Raises:
422+
click.Abort: If the termination signal is received.
423+
"""
424+
self.logger.info("Gracefully shutting down...")
425+
errmsg = "Received termination signal"
426+
raise click.Abort(errmsg)
427+
405428
@classmethod
406429
def print_version(
407430
cls: type[PluginBase],

singer_sdk/tap_base.py

+16
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
if t.TYPE_CHECKING:
3333
from pathlib import PurePath
34+
from types import FrameType
3435

3536
from singer_sdk.connectors import SQLConnector
3637
from singer_sdk.mapper import PluginMapper
@@ -473,6 +474,21 @@ def sync_all(self) -> None:
473474

474475
# Command Line Execution
475476

477+
def _handle_termination( # pragma: no cover
478+
self,
479+
signum: int,
480+
frame: FrameType | None,
481+
) -> None:
482+
"""Handle termination signal.
483+
484+
Args:
485+
signum: Signal number.
486+
frame: Frame.
487+
"""
488+
for stream in self.streams.values():
489+
stream.finalize_state_progress_markers()
490+
super()._handle_termination(signum, frame)
491+
476492
@classmethod
477493
def invoke( # type: ignore[override]
478494
cls: type[Tap],

singer_sdk/target_base.py

+19
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
if t.TYPE_CHECKING:
3434
from pathlib import PurePath
35+
from types import FrameType
3536

3637
from singer_sdk.connectors import SQLConnector
3738
from singer_sdk.mapper import PluginMapper
@@ -540,6 +541,24 @@ def _write_state_message(self, state: dict) -> None:
540541

541542
# CLI handler
542543

544+
def _handle_termination( # pragma: no cover
545+
self,
546+
signum: int,
547+
frame: FrameType | None,
548+
) -> None:
549+
"""Handle termination signals.
550+
551+
Args:
552+
signum: Signal number.
553+
frame: Frame object.
554+
"""
555+
self.logger.info(
556+
"Received termination signal %d, draining all sinks...",
557+
signum,
558+
)
559+
self.drain_all(is_endofpipe=True)
560+
super()._handle_termination(signum, frame)
561+
543562
@classmethod
544563
def invoke( # type: ignore[override]
545564
cls: type[Target],

0 commit comments

Comments
 (0)