Skip to content

Commit

Permalink
Merge branch 'main' into restore-unique-constraint-logical-date
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 authored Jan 29, 2025
2 parents 8f9e076 + 2c1a3cc commit 1c47c75
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
2 changes: 1 addition & 1 deletion task_sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies = [
"httpx>=0.27.0",
"jinja2>=3.1.4",
"methodtools>=0.4.7",
"msgspec>=0.18.6",
"msgspec>=0.19.0",
"psutil>=6.1.0",
"structlog>=24.4.0",
"retryhttp>=1.2.0",
Expand Down
6 changes: 2 additions & 4 deletions task_sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,11 @@ def logging_processors(
else:
exc_group_processor = None

encoder = msgspec.json.Encoder()

def json_dumps(msg, default):
return encoder.encode(msg)
return msgspec.json.encode(msg, enc_hook=default)

def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
return encoder.encode(event_dict).decode("utf-8")
return msgspec.json.encode(event_dict).decode("utf-8")

json = structlog.processors.JSONRenderer(serializer=json_dumps)

Expand Down
36 changes: 23 additions & 13 deletions task_sdk/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,19 @@ def pytest_runtest_setup(item):

class LogCapture:
# Like structlog.typing.LogCapture, but that doesn't add log_level in to the event dict
entries: list[EventDict]
entries: list[EventDict | bytes]

def __init__(self) -> None:
self.entries = []

def __call__(self, _: WrappedLogger, method_name: str, event_dict: EventDict) -> NoReturn:
def __call__(self, _: WrappedLogger, method_name: str, event: EventDict | bytes) -> NoReturn:
from structlog.exceptions import DropEvent

if "level" not in event_dict:
event_dict["_log_level"] = method_name
if isinstance(event, dict):
if "level" not in event:
event["_log_level"] = method_name

self.entries.append(event_dict)
self.entries.append(event)

raise DropEvent

Expand All @@ -93,20 +94,29 @@ def captured_logs(request):
reset_logging()
configure_logging(enable_pretty_log=False)

# Get log level from test parameter, defaulting to INFO if not provided
log_level = getattr(request, "param", logging.INFO)
# Get log level from test parameter, which can either be a single log level or a
# tuple of log level and desired output type, defaulting to INFO if not provided
log_level = logging.INFO
output = "dict"
param = getattr(request, "param", logging.INFO)
if isinstance(param, int):
log_level = param
elif isinstance(param, tuple):
log_level = param[0]
output = param[1]

# We want to capture all logs, but we don't want to see them in the test output
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(log_level))

# But we need to replace remove the last processor (the one that turns JSON into text, as we want the
# event dict for tests)
cur_processors = structlog.get_config()["processors"]
processors = cur_processors.copy()
proc = processors.pop()
assert isinstance(
proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer)
), "Pre-condition"
if output == "dict":
# We need to replace remove the last processor (the one that turns JSON into text, as we want the
# event dict for tests)
proc = processors.pop()
assert isinstance(
proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer)
), "Pre-condition"
try:
cap = LogCapture()
processors.append(cap)
Expand Down
56 changes: 56 additions & 0 deletions task_sdk/tests/test_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import json
import logging
import unittest.mock

import pytest
import structlog
from uuid6 import UUID

from airflow.sdk.api.datamodels._generated import TaskInstance


@pytest.mark.parametrize(
"captured_logs", [(logging.INFO, "json")], indirect=True, ids=["log_level=info,formatter=json"]
)
def test_json_rendering(captured_logs):
"""
Test that the JSON formatter renders correctly.
"""
logger = structlog.get_logger()
logger.info(
"A test message with a Pydantic class",
pydantic_class=TaskInstance(
id=UUID("ffec3c8e-2898-46f8-b7d5-3cc571577368"),
dag_id="test_dag",
task_id="test_task",
run_id="test_run",
try_number=1,
),
)
assert captured_logs
assert isinstance(captured_logs[0], bytes)
assert json.loads(captured_logs[0]) == {
"event": "A test message with a Pydantic class",
"pydantic_class": "TaskInstance(id=UUID('ffec3c8e-2898-46f8-b7d5-3cc571577368'), task_id='test_task', dag_id='test_dag', run_id='test_run', try_number=1, map_index=-1, hostname=None)",
"timestamp": unittest.mock.ANY,
"level": "info",
}

0 comments on commit 1c47c75

Please sign in to comment.