Skip to content

Commit

Permalink
Add log_level to elasticsearch_log to prevent circuit_breaker_exception
Browse files Browse the repository at this point in the history
openeo-python-driver/#170

Adding a log level to the ES search will reduce the amount of logs that ES processes
so it won't use up so much memory, which is what triggers the circuit breaker.
  • Loading branch information
JohanKJSchreurs committed Mar 30, 2023
1 parent b90ff19 commit e9fd027
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
10 changes: 8 additions & 2 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,13 +2185,19 @@ def get_results_metadata(self, job_id: str, user_id: str) -> dict:

return {}

def get_log_entries(self, job_id: str, user_id: str, offset: Optional[str] = None) -> Iterable[dict]:
def get_log_entries(
self,
job_id: str,
user_id: str,
offset: Optional[str] = None,
log_level: Optional[str] = None,
) -> Iterable[dict]:
# will throw if job doesn't match user
job_info = self.get_job_info(job_id=job_id, user_id=user_id)
if job_info.status in [JOB_STATUS.CREATED, JOB_STATUS.QUEUED]:
return iter(())

return elasticsearch_logs(job_id, job_info.created, offset)
return elasticsearch_logs(job_id, job_info.created, offset, log_level)

def cancel_job(self, job_id: str, user_id: str):
with self._double_job_registry as registry:
Expand Down
49 changes: 45 additions & 4 deletions openeogeotrellis/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from typing import Iterable, Optional


from elasticsearch import Elasticsearch, ConnectionTimeout
from elasticsearch import Elasticsearch, ConnectionTimeout, TransportError

from openeo.api.logs import normalize_log_level
from openeo.util import dict_no_none, rfc3339
from openeo_driver.errors import OpenEOApiException
from openeo_driver.util.logging import FlaskRequestCorrelationIdLogging
Expand All @@ -19,7 +21,10 @@


def elasticsearch_logs(
job_id: str, create_time: Optional[dt.datetime] = None, offset: Optional[str] = None
job_id: str,
create_time: Optional[dt.datetime] = None,
offset: Optional[str] = None,
log_level: Optional[str] = None,
) -> Iterable[dict]:
"""Retrieve a job's logs from Elasticsearch.
Expand All @@ -38,6 +43,9 @@ def elasticsearch_logs(
For example: "[1673351608383, 102790]"
:param log_level:
Return only logs with this log level or higher.
:raises OpenEOApiException:
- Either when the offset is not valid JSON
- or when Elasticsearch had a connection timeout
Expand All @@ -46,7 +54,7 @@ def elasticsearch_logs(
"""
try:
search_after = None if offset in [None, ""] else json.loads(offset)
return _elasticsearch_logs(job_id, create_time, search_after)
return _elasticsearch_logs(job_id, create_time, search_after, log_level)
except json.decoder.JSONDecodeError:
raise OpenEOApiException(status_code=400, code="OffsetInvalid",
message=f"The value passed for the query parameter 'offset' is invalid: {offset}")
Expand All @@ -56,6 +64,7 @@ def _elasticsearch_logs(
job_id: str,
create_time: Optional[dt.datetime] = None,
search_after: Optional[list] = None,
log_level: Optional[str] = None,
) -> Iterable[dict]:
"""Internal helper function to retrieve a job's logs from Elasticsearch.
Expand All @@ -72,6 +81,9 @@ def _elasticsearch_logs(
For example: [1673351608383, 102790]
:param log_level:
Return only logs with this log level or higher.
:raises OpenEOApiException:
- Either when the offset is not valid JSON
- or when Elasticsearch had a connection timeout
Expand All @@ -80,6 +92,17 @@ def _elasticsearch_logs(
"""

req_id = FlaskRequestCorrelationIdLogging.get_request_id()
# log_level = log_level.upper() if log_level else None
log_level_int = normalize_log_level(log_level)
level_filter = None
if log_level_int:
levels_to_include = {
logging.ERROR: ["ERROR"],
logging.WARNING: ["ERROR", "WARNING"],
logging.INFO: ["ERROR", "WARNING", "INFO"],
logging.DEBUG: ["ERROR", "WARNING", "INFO", "DEBUG"],
}
level_filter = {"terms": {"levelname": levels_to_include[log_level_int]}}

page_size = 100
query = {
Expand All @@ -101,6 +124,11 @@ def _elasticsearch_logs(
}
}
)
if level_filter:
query["bool"]["filter"].append(level_filter)
print(f"{level_filter=}")
print(f"{query=}")

with Elasticsearch(ES_HOSTS) as es:
while True:
try:
Expand All @@ -121,11 +149,24 @@ def _elasticsearch_logs(
except ConnectionTimeout as exc:
# TODO: add a test that verifies: doesn't leak sensitive info + it does log the ConnectionTimeout
message = (
f"Temporary failure while retrieving logs: ConnectionTimeout. "
"Temporary failure while retrieving logs: ConnectionTimeout. "
+ f"Please try again and report this error if it persists. (ref: {req_id})"
)
raise OpenEOApiException(status_code=504, message=message) from exc

except TransportError as exc:
# TODO: Retry when ES raises circuit breaker exception + better error if still fails.
# https://github.com/Open-EO/openeo-python-driver/issues/170
if exc.status_code == 429:
message = (
"Temporary failure while retrieving logs: Search for logs interrupted "
+ "because it used too memory. "
+ f"Please try again and report this error if it persists. (ref: {req_id})"
)
raise OpenEOApiException(status_code=429, message=message) from exc
else:
raise

else:
hits = search_result["hits"]["hits"]

Expand Down
21 changes: 19 additions & 2 deletions tests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from openeogeotrellis.logs import elasticsearch_logs
from openeo_driver.errors import OpenEOApiException

from elasticsearch.exceptions import ConnectionTimeout
from elasticsearch.exceptions import ConnectionTimeout, TransportError


@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
Expand All @@ -25,7 +25,7 @@ def test_elasticsearch_logs_skips_entry_with_empty_loglevel_simple_case(mock_sea


@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
def test_elasticsearch_logs_keeps_entry_when_loglevel_filled_in(mock_search):
def test_elasticsearch_logs_keeps_entry_with_value_for_loglevel(mock_search):
search_hit = {
"_source": {
"levelname": "ERROR",
Expand Down Expand Up @@ -115,3 +115,20 @@ def test_connection_timeout_raises_openeoapiexception(mock_search):
+ "Please try again and report this error if it persists. (ref: no-request)"
)
assert raise_context.value.message == expected_message


@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
def test_circuit_breaker_raises_openeoapiexception(mock_search):
mock_search.side_effect = TransportError(
429, "Simulating circuit breaker exception"
)

with pytest.raises(OpenEOApiException) as raise_context:
list(elasticsearch_logs("job-foo", create_time=None, offset=None))

expected_message = (
"Temporary failure while retrieving logs: Search for logs interrupted "
+ "because it used too memory. "
+ "Please try again and report this error if it persists. (ref: no-request)"
)
assert raise_context.value.message == expected_message

0 comments on commit e9fd027

Please sign in to comment.