Skip to content

Commit f87387e

Browse files
feat: Interruption and termination signals in taps and targets
1 parent 0e2df68 commit f87387e

File tree

5 files changed

+68
-2
lines changed

5 files changed

+68
-2
lines changed

samples/sample_tap_countries/countries_streams.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import abc
1212
import sys
13+
from typing import Iterable # noqa: ICN003
1314

1415
from singer_sdk import typing as th
1516
from singer_sdk.streams.graphql import GraphQLStream
@@ -82,6 +83,14 @@ class CountriesStream(CountriesAPIStream):
8283
),
8384
).to_dict()
8485

86+
# FIXME: revert these changes before merging
87+
def request_records(self, context) -> Iterable[dict]:
88+
import time # noqa: PLC0415
89+
90+
time.sleep(60) # Simulate a slow stream
91+
92+
return super().request_records(context)
93+
8594

8695
class ContinentsStream(CountriesAPIStream):
8796
"""Continents stream from the Countries API."""

samples/sample_target_csv/csv_target.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class SampleTargetCSV(Target):
1212

1313
name = "target-csv"
1414
config_jsonschema = th.PropertiesList(
15-
th.Property("target_folder", th.StringType, required=True),
15+
th.Property("target_folder", th.StringType, default="output"),
1616
th.Property("file_naming_scheme", th.StringType),
1717
).to_dict()
1818
default_sink_class = SampleCSVTargetSink

singer_sdk/plugin_base.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import abc
66
import logging
77
import os
8+
import signal
89
import sys
910
import time
1011
import typing as t
1112
from importlib import metadata
1213
from pathlib import Path, PurePath
13-
from types import MappingProxyType
14+
from types import FrameType, MappingProxyType
1415

1516
import click
1617

@@ -176,6 +177,10 @@ def __init__(
176177
# Initialization timestamp
177178
self.__initialized_at = int(time.time() * 1000)
178179

180+
# Signal handling
181+
signal.signal(signal.SIGINT, self._handle_termination)
182+
signal.signal(signal.SIGTERM, self._handle_termination)
183+
179184
def setup_mapper(self) -> None:
180185
"""Initialize the plugin mapper for this tap."""
181186
self._mapper = PluginMapper(
@@ -402,6 +407,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:
402407

403408
return errors
404409

410+
def _handle_termination( # pragma: no cover
411+
self,
412+
signum: int, # noqa: ARG002
413+
frame: FrameType | None, # noqa: ARG002
414+
) -> None:
415+
"""Handle termination signal.
416+
417+
Args:
418+
signum: Signal number.
419+
frame: Frame.
420+
421+
Raises:
422+
click.Abort: If the termination signal is received.
423+
"""
424+
self.logger.info("Gracefully shutting down...")
425+
errmsg = "Received termination signal"
426+
raise click.Abort(errmsg)
427+
405428
@classmethod
406429
def print_version(
407430
cls: type[PluginBase],

singer_sdk/tap_base.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
if t.TYPE_CHECKING:
3333
from pathlib import PurePath
34+
from types import FrameType
3435

3536
from singer_sdk.connectors import SQLConnector
3637
from singer_sdk.mapper import PluginMapper
@@ -473,6 +474,20 @@ def sync_all(self) -> None:
473474

474475
# Command Line Execution
475476

477+
def _handle_termination( # pragma: no cover
478+
self,
479+
signum: int,
480+
frame: FrameType | None,
481+
) -> None:
482+
"""Handle termination signal.
483+
484+
Args:
485+
signum: Signal number.
486+
frame: Frame.
487+
"""
488+
self.write_message(StateMessage(value=self.state))
489+
super()._handle_termination(signum, frame)
490+
476491
@classmethod
477492
def invoke( # type: ignore[override]
478493
cls: type[Tap],

singer_sdk/target_base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
if t.TYPE_CHECKING:
3434
from pathlib import PurePath
35+
from types import FrameType
3536

3637
from singer_sdk.connectors import SQLConnector
3738
from singer_sdk.mapper import PluginMapper
@@ -540,6 +541,24 @@ def _write_state_message(self, state: dict) -> None:
540541

541542
# CLI handler
542543

544+
def _handle_termination( # pragma: no cover
545+
self,
546+
signum: int,
547+
frame: FrameType | None,
548+
) -> None:
549+
"""Handle termination signals.
550+
551+
Args:
552+
signum: Signal number.
553+
frame: Frame object.
554+
"""
555+
self.logger.info(
556+
"Received termination signal %d, draining all sinks...",
557+
signum,
558+
)
559+
self.drain_all(is_endofpipe=True)
560+
super()._handle_termination(signum, frame)
561+
543562
@classmethod
544563
def invoke( # type: ignore[override]
545564
cls: type[Target],

0 commit comments

Comments
 (0)