Skip to content

Commit

Permalink
feat(ingest): datahub-rest - Make datahub-rest client more robust by …
Browse files Browse the repository at this point in the history
…configurable retries. (datahub-project#3826) (datahub-project#3860)
  • Loading branch information
Rickard Cardell authored Jan 12, 2022
1 parent 8b1b0fd commit 2f7e49b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 4 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/sink_docs/datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| -------- | -------- | ------- | ---------------------------- |
| `server` | ✅ | | URL of DataHub GMS endpoint. |
| `timeout_sec` | | 30 | Per-HTTP request timeout. |
| `retry_max_times` | | 1 | Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially |
| `retry_status_codes`| | [429, 502, 503, 504] | Retry HTTP request also on these status codes |
| `token` | | | Bearer token used for authentication. |
| `extra_headers` | | | Extra headers which will be added to the request. |
| `max_threads` | | `1` | Experimental: Max parallelism for REST API calls |
Expand Down
34 changes: 30 additions & 4 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional, Tuple, Union

import requests
import requests.adapters
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub import __package_name__
Expand Down Expand Up @@ -45,19 +45,30 @@ class DatahubRestEmitter:
DEFAULT_READ_TIMEOUT_SEC = (
30 # Any ingest call taking longer than 30 seconds should be abandoned
)
DEFAULT_RETRY_STATUS_CODES = [ # Additional status codes to retry on
429,
502,
503,
504,
]
DEFAULT_RETRY_MAX_TIMES = 1

_gms_server: str
_token: Optional[str]
_session: requests.Session
_connect_timeout_sec: float = DEFAULT_CONNECT_TIMEOUT_SEC
_read_timeout_sec: float = DEFAULT_READ_TIMEOUT_SEC
_retry_status_codes: List[int] = DEFAULT_RETRY_STATUS_CODES
_retry_max_times: int = DEFAULT_RETRY_MAX_TIMES

def __init__(
self,
gms_server: str,
token: Optional[str] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
retry_status_codes: Optional[List[int]] = None,
retry_max_times: Optional[int] = None,
extra_headers: Optional[Dict[str, str]] = None,
ca_certificate_path: Optional[str] = None,
):
Expand All @@ -69,9 +80,6 @@ def __init__(
self._token = token

self._session = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)

self._session.headers.update(
{
Expand Down Expand Up @@ -99,6 +107,24 @@ def __init__(
f"Setting timeout values lower than 1 second is not recommended. Your configuration is connect_timeout:{self._connect_timeout_sec}s, read_timeout:{self._read_timeout_sec}s"
)

if retry_status_codes is not None: # Only if missing. Empty list is allowed
self._retry_status_codes = retry_status_codes

if retry_max_times:
self._retry_max_times = retry_max_times

retry_strategy = Retry(
total=self._retry_max_times,
status_forcelist=self._retry_status_codes,
backoff_factor=2,
)

adapter = HTTPAdapter(
pool_connections=100, pool_maxsize=100, max_retries=retry_strategy
)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)

def test_connection(self) -> None:
response = self._session.get(f"{self._gms_server}/config")
response.raise_for_status()
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DatahubClientConfig(ConfigModel):
server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
retry_status_codes: Optional[List[int]]
retry_max_times: Optional[int]
extra_headers: Optional[Dict[str, str]]
ca_certificate_path: Optional[str]
max_threads: int = 1
Expand All @@ -42,6 +44,8 @@ def __init__(self, config: DatahubClientConfig) -> None:
token=self.config.token,
connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout
read_timeout_sec=self.config.timeout_sec,
retry_status_codes=self.config.retry_status_codes,
retry_max_times=self.config.retry_max_times,
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig):
self.config.token,
connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout
read_timeout_sec=self.config.timeout_sec,
retry_status_codes=self.config.retry_status_codes,
retry_max_times=self.config.retry_max_times,
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/tests/unit/test_rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def test_datahub_rest_emitter_construction():
emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT)
assert emitter._connect_timeout_sec == emitter.DEFAULT_CONNECT_TIMEOUT_SEC
assert emitter._read_timeout_sec == emitter.DEFAULT_READ_TIMEOUT_SEC
assert emitter._retry_status_codes == emitter.DEFAULT_RETRY_STATUS_CODES
assert emitter._retry_max_times == emitter.DEFAULT_RETRY_MAX_TIMES


def test_datahub_rest_emitter_timeout_construction():
Expand All @@ -17,6 +19,16 @@ def test_datahub_rest_emitter_timeout_construction():
assert emitter._read_timeout_sec == 4


def test_datahub_rest_emitter_retry_construction():
emitter = DatahubRestEmitter(
MOCK_GMS_ENDPOINT,
retry_status_codes=[418],
retry_max_times=42,
)
assert emitter._retry_status_codes == [418]
assert emitter._retry_max_times == 42


def test_datahub_rest_emitter_extra_params():
emitter = DatahubRestEmitter(
MOCK_GMS_ENDPOINT, extra_headers={"key1": "value1", "key2": "value2"}
Expand Down

0 comments on commit 2f7e49b

Please sign in to comment.