Skip to content

Commit 2daf3f5

Browse files
feat: Settings write-back
1 parent 86e249d commit 2daf3f5

File tree

9 files changed

+144
-20
lines changed

9 files changed

+144
-20
lines changed

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
# -- Options for intersphinx -----------------------------------------------------------
153153
# https://www.sphinx-doc.org/en/master/usage/extensions/intersphinx.html#configuration
154154
intersphinx_mapping = {
155+
"blinker": ("https://blinker.readthedocs.io/en/stable/", None),
155156
"requests": ("https://requests.readthedocs.io/en/latest/", None),
156157
"python": ("https://docs.python.org/3/", None),
157158
}

docs/guides/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ The following pages contain useful information for developers building on top of
88
porting
99
pagination-classes
1010
custom-clis
11+
signals
1112
```

docs/guides/signals.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Signals
2+
3+
This guide will show you how to use the built-in [Blinker](inv:blinker:std:doc#index) signals in the Singer SDK.
4+
5+
## Settings write-back
6+
7+
The SDK provides a signal that allows you to write back settings to the configuration file. This is useful if you want to update the configuration file with new settings that were set during the run, like a `refresh_token`.
8+
9+
```python
10+
import requests
11+
from singer_sdk.authenticators import OAuthAuthenticator
12+
from singer_sdk.plugin_base import PluginBase
13+
14+
15+
class RefreshTokenAuthenticator(OAuthAuthenticator):
16+
def __init__(self, *args, **kwargs):
17+
super().__init__(*args, **kwargs)
18+
self.refresh_token = self.config["refresh_token"]
19+
20+
@property
21+
def oauth_request_body(self):
22+
return {
23+
"client_id": self.config["client_id"],
24+
"client_secret": self.config["client_secret"],
25+
"grant_type": "refresh_token",
26+
"refresh_token": self.refresh_token,
27+
"user_type": "Location",
28+
}
29+
30+
def update_access_token(self):
31+
token_response = requests.post(
32+
self.auth_endpoint,
33+
headers=self._oauth_headers,
34+
data=auth_request_payload,
35+
timeout=60,
36+
)
37+
token_response.raise_for_status()
38+
token_json = token_response.json()
39+
40+
self.access_token = token_json["access_token"]
41+
self.refresh_token = token_json["refresh_token"]
42+
PluginBase.config_updated.send(self, refresh_token=self.refresh_token)
43+
```
44+
45+
In the example above, the `RefreshTokenAuthenticator` class is a subclass of `OAuthAuthenticator` that calls `PluginBase.config_updated.send` to send a signal to update the `refresh_token` in tap's configuration.
46+
47+
```{note}
48+
Only when a single file is passed via the `--config` command line option, the SDK will write back the settings to the same file.
49+
```

poetry.lock

Lines changed: 12 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ license = "Apache-2.0"
4141
python = ">=3.8"
4242
backoff = { version = ">=2.0.0", python = "<4" }
4343
backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
44+
blinker = ">=1.7.0"
4445
click = "~=8.0"
4546
cryptography = ">=3.4.6"
4647
fs = ">=2.4.16"

singer_sdk/authenticators.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import typing as t
88
import warnings
99
from datetime import timedelta
10-
from types import MappingProxyType
1110
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
1211

1312
import requests
@@ -16,6 +15,7 @@
1615

1716
if t.TYPE_CHECKING:
1817
import logging
18+
from types import MappingProxyType
1919

2020
from pendulum import DateTime
2121

@@ -90,19 +90,19 @@ def __init__(self, stream: RESTStream) -> None:
9090
stream: A stream for a RESTful endpoint.
9191
"""
9292
self.tap_name: str = stream.tap_name
93-
self._config: dict[str, t.Any] = dict(stream.config)
93+
self._config = stream.config
9494
self._auth_headers: dict[str, t.Any] = {}
9595
self._auth_params: dict[str, t.Any] = {}
9696
self.logger: logging.Logger = stream.logger
9797

9898
@property
99-
def config(self) -> t.Mapping[str, t.Any]:
99+
def config(self) -> MappingProxyType:
100100
"""Get stream or tap config.
101101
102102
Returns:
103103
A frozen (read-only) config dictionary map.
104104
"""
105-
return MappingProxyType(self._config)
105+
return self._config
106106

107107
@property
108108
def auth_headers(self) -> dict:

singer_sdk/plugin_base.py

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import abc
6+
import json
67
import logging
78
import os
89
import sys
@@ -13,6 +14,7 @@
1314
from types import MappingProxyType
1415

1516
import click
17+
from blinker import Signal
1618
from jsonschema import Draft7Validator
1719

1820
from singer_sdk import about, metrics
@@ -98,6 +100,9 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904
98100

99101
_config: dict
100102

103+
# Signals
104+
config_updated = Signal()
105+
101106
@classproperty
102107
def logger(cls) -> logging.Logger: # noqa: N805
103108
"""Get logger.
@@ -134,10 +139,43 @@ def __init__(
134139
it can be a predetermined config dict.
135140
parse_env_config: True to parse settings from env vars.
136141
validate_config: True to require validation of config settings.
142+
"""
143+
self._config, self._config_path = self._process_config(
144+
config=config,
145+
parse_env_config=parse_env_config,
146+
)
147+
metrics._setup_logging(self.config) # noqa: SLF001
148+
self.metrics_logger = metrics.get_metrics_logger()
149+
150+
self._validate_config(raise_errors=validate_config)
151+
self._mapper: PluginMapper | None = None
152+
153+
# Initialization timestamp
154+
self.__initialized_at = int(time.time() * 1000)
155+
156+
self.config_updated.connect(self.update_config)
157+
158+
def _process_config(
159+
self,
160+
*,
161+
config: dict | PurePath | str | list[PurePath | str] | None = None,
162+
parse_env_config: bool = False,
163+
) -> tuple[dict[str, t.Any], PurePath | str | None]:
164+
"""Process the plugin configuration.
165+
166+
Args:
167+
config: May be one or more paths, either as str or PurePath objects, or
168+
it can be a predetermined config dict.
169+
parse_env_config: True to parse settings from env vars.
170+
171+
Returns:
172+
A tuple containing the config dictionary and the config write-back path.
137173
138174
Raises:
139175
ValueError: If config is not a dict or path string.
140176
"""
177+
config_path = None
178+
141179
if not config:
142180
config_dict = {}
143181
elif isinstance(config, (str, PurePath)):
@@ -148,28 +186,29 @@ def __init__(
148186
# Read each config file sequentially. Settings from files later in the
149187
# list will override those of earlier ones.
150188
config_dict.update(read_json_file(config_path))
189+
190+
if len(config) == 1 and not parse_env_config:
191+
config_path = config[0]
192+
151193
elif isinstance(config, dict):
152194
config_dict = config
153-
else:
195+
else: # pragma: no cover
154196
msg = f"Error parsing config of type '{type(config).__name__}'."
155197
raise ValueError(msg)
198+
199+
# Parse env var settings
156200
if parse_env_config:
157201
self.logger.info("Parsing env var for settings config...")
158202
config_dict.update(self._env_var_config)
159203
else:
160204
self.logger.info("Skipping parse of env var settings...")
205+
206+
# Handle sensitive settings
161207
for k, v in config_dict.items():
162208
if self._is_secret_config(k):
163209
config_dict[k] = SecretString(v)
164-
self._config = config_dict
165-
metrics._setup_logging(self.config) # noqa: SLF001
166-
self.metrics_logger = metrics.get_metrics_logger()
167210

168-
self._validate_config(raise_errors=validate_config)
169-
self._mapper: PluginMapper | None = None
170-
171-
# Initialization timestamp
172-
self.__initialized_at = int(time.time() * 1000)
211+
return config_dict, config_path
173212

174213
def setup_mapper(self) -> None:
175214
"""Initialize the plugin mapper for this tap."""
@@ -336,13 +375,28 @@ def state(self) -> dict:
336375
# Core plugin config:
337376

338377
@property
339-
def config(self) -> t.Mapping[str, t.Any]:
378+
def config(self) -> MappingProxyType:
340379
"""Get config.
341380
342381
Returns:
343382
A frozen (read-only) config dictionary map.
344383
"""
345-
return t.cast(dict, MappingProxyType(self._config))
384+
return MappingProxyType(self._config)
385+
386+
def update_config(self, sender: t.Any, **settings: t.Any) -> None: # noqa: ANN401, ARG002
387+
"""Update the config with new settings.
388+
389+
This is a :external+blinker:std:doc:`Blinker <index>` signal receiver.
390+
391+
Args:
392+
sender: The sender of the signal.
393+
**settings: New settings to update the config with.
394+
"""
395+
self._config.update(**settings)
396+
if self._config_path is not None: # pragma: no cover
397+
self.logger.info("Updating config file: %s", self._config_path)
398+
with Path(self._config_path).open("w") as f:
399+
json.dump(self._config, f)
346400

347401
@staticmethod
348402
def _is_secret_config(config_key: str) -> bool:

singer_sdk/streams/core.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import typing as t
1111
from os import PathLike
1212
from pathlib import Path
13-
from types import MappingProxyType
1413

1514
import pendulum
1615

@@ -58,6 +57,7 @@
5857

5958
if t.TYPE_CHECKING:
6059
import logging
60+
from types import MappingProxyType
6161

6262
from singer_sdk.helpers._compat import Traversable
6363
from singer_sdk.tap_base import Tap
@@ -135,7 +135,6 @@ def __init__(
135135
self.logger: logging.Logger = tap.logger.getChild(self.name)
136136
self.metrics_logger = tap.metrics_logger
137137
self.tap_name: str = tap.name
138-
self._config: dict = dict(tap.config)
139138
self._tap = tap
140139
self._tap_state = tap.state
141140
self._tap_input_catalog: singer.Catalog | None = None
@@ -602,13 +601,13 @@ def _singer_catalog(self) -> singer.Catalog:
602601
return singer.Catalog([(self.tap_stream_id, self._singer_catalog_entry)])
603602

604603
@property
605-
def config(self) -> t.Mapping[str, t.Any]:
604+
def config(self) -> MappingProxyType[str, t.Any]:
606605
"""Get stream configuration.
607606
608607
Returns:
609608
A frozen (read-only) config dictionary map.
610609
"""
611-
return MappingProxyType(self._config)
610+
return self._tap.config
612611

613612
@property
614613
def tap_stream_id(self) -> str:

tests/core/test_plugin_base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,11 @@ def test_mapper_not_initialized():
5757
def test_supported_python_versions():
5858
"""Test that supported python versions are correctly parsed."""
5959
assert PluginBase._get_supported_python_versions(SDK_PACKAGE_NAME)
60+
61+
62+
def test_config_updated_signal():
63+
plugin = PluginTest(config={"prop1": "hello"})
64+
assert plugin.config == {"prop1": "hello"}
65+
66+
PluginBase.config_updated.send(prop2="abc")
67+
assert plugin.config == {"prop1": "hello", "prop2": "abc"}

0 commit comments

Comments
 (0)