Skip to content

Commit aa5831a

Browse files
feat: Interruption and termination signals in taps and targets
1 parent 344a218 commit aa5831a

File tree

5 files changed

+77
-2
lines changed

5 files changed

+77
-2
lines changed

samples/sample_tap_countries/countries_streams.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from singer_sdk import typing as th
1818
from singer_sdk.streams.graphql import GraphQLStream
1919

20+
if t.TYPE_CHECKING:
21+
from collections.abc import Iterable
22+
2023
SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas"
2124

2225

@@ -83,6 +86,18 @@ class CountriesStream(CountriesAPIStream):
8386
),
8487
).to_dict()
8588

89+
# FIXME: revert these changes before merging
90+
def request_records(self, context) -> Iterable[dict]:
91+
import time # noqa: PLC0415
92+
93+
records = super().request_records(context)
94+
95+
yield next(records) # Emit the first record
96+
97+
time.sleep(60) # Simulate a slow stream
98+
99+
yield from records # Emit the rest of the records
100+
86101

87102
class ContinentsStream(CountriesAPIStream):
88103
"""Continents stream from the Countries API."""

samples/sample_target_csv/csv_target.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ class SampleTargetCSV(Target):
1313
name = "target-csv"
1414
config_jsonschema = th.PropertiesList(
1515
th.Property(
16-
"target_folder", th.StringType, required=True, title="Target Folder"
16+
"target_folder",
17+
th.StringType,
18+
default="output",
19+
title="Target Folder",
1720
),
1821
th.Property("file_naming_scheme", th.StringType, title="File Naming Scheme"),
1922
).to_dict()

singer_sdk/plugin_base.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
import abc
66
import logging
77
import os
8+
import signal
89
import sys
910
import time
1011
import typing as t
1112
import warnings
1213
from importlib import metadata
1314
from pathlib import Path, PurePath
14-
from types import MappingProxyType
15+
from types import FrameType, MappingProxyType
1516

1617
import click
1718

@@ -190,6 +191,10 @@ def __init__(
190191
# Initialization timestamp
191192
self.__initialized_at = int(time.time() * 1000)
192193

194+
# Signal handling
195+
signal.signal(signal.SIGINT, self._handle_termination)
196+
signal.signal(signal.SIGTERM, self._handle_termination)
197+
193198
def setup_mapper(self) -> None:
194199
"""Initialize the plugin mapper for this tap."""
195200
self._mapper = PluginMapper(
@@ -416,6 +421,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:
416421

417422
return errors
418423

424+
def _handle_termination( # pragma: no cover
425+
self,
426+
signum: int, # noqa: ARG002
427+
frame: FrameType | None, # noqa: ARG002
428+
) -> None:
429+
"""Handle termination signal.
430+
431+
Args:
432+
signum: Signal number.
433+
frame: Frame.
434+
435+
Raises:
436+
click.Abort: If the termination signal is received.
437+
"""
438+
self.logger.info("Gracefully shutting down...")
439+
errmsg = "Received termination signal"
440+
raise click.Abort(errmsg)
441+
419442
@classmethod
420443
def print_version(
421444
cls: type[PluginBase],

singer_sdk/tap_base.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
if t.TYPE_CHECKING:
3535
from pathlib import PurePath
36+
from types import FrameType
3637

3738
from singer_sdk.connectors import SQLConnector
3839
from singer_sdk.mapper import PluginMapper
@@ -487,6 +488,20 @@ def sync_all(self) -> None:
487488

488489
# Command Line Execution
489490

491+
def _handle_termination( # pragma: no cover
492+
self,
493+
signum: int,
494+
frame: FrameType | None,
495+
) -> None:
496+
"""Handle termination signal.
497+
498+
Args:
499+
signum: Signal number.
500+
frame: Frame.
501+
"""
502+
self.write_message(StateMessage(value=self.state))
503+
super()._handle_termination(signum, frame)
504+
490505
@classmethod
491506
def invoke( # type: ignore[override]
492507
cls: type[Tap],

singer_sdk/target_base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
if t.TYPE_CHECKING:
3434
from pathlib import PurePath
35+
from types import FrameType
3536

3637
from singer_sdk.connectors import SQLConnector
3738
from singer_sdk.mapper import PluginMapper
@@ -542,6 +543,24 @@ def _write_state_message(self, state: dict) -> None:
542543

543544
# CLI handler
544545

546+
def _handle_termination( # pragma: no cover
547+
self,
548+
signum: int,
549+
frame: FrameType | None,
550+
) -> None:
551+
"""Handle termination signals.
552+
553+
Args:
554+
signum: Signal number.
555+
frame: Frame object.
556+
"""
557+
self.logger.info(
558+
"Received termination signal %d, draining all sinks...",
559+
signum,
560+
)
561+
self.drain_all(is_endofpipe=True)
562+
super()._handle_termination(signum, frame)
563+
545564
@classmethod
546565
def invoke( # type: ignore[override]
547566
cls: type[Target],

0 commit comments

Comments
 (0)