Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ambika-garg authored Nov 22, 2024
2 parents 6dca59c + 91bd1ea commit a9fb949
Show file tree
Hide file tree
Showing 77 changed files with 1,502 additions and 209 deletions.
7 changes: 6 additions & 1 deletion .devcontainer/mysql/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@
"rogalmic.bash-debug"
],
"service": "airflow",
"forwardPorts": [8080,5555,5432,6379]
"forwardPorts": [8080,5555,5432,6379],
"workspaceFolder": "/opt/airflow",
// for users who use non-standard git config patterns
// https://github.com/microsoft/vscode-remote-release/issues/2084#issuecomment-989756268
"initializeCommand": "cd \"${localWorkspaceFolder}\" && git config --local user.email \"$(git config user.email)\" && git config --local user.name \"$(git config user.name)\"",
"overrideCommand": true
}
7 changes: 6 additions & 1 deletion .devcontainer/postgres/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@
"rogalmic.bash-debug"
],
"service": "airflow",
"forwardPorts": [8080,5555,5432,6379]
"forwardPorts": [8080,5555,5432,6379],
"workspaceFolder": "/opt/airflow",
// for users who use non-standard git config patterns
// https://github.com/microsoft/vscode-remote-release/issues/2084#issuecomment-989756268
"initializeCommand": "cd \"${localWorkspaceFolder}\" && git config --local user.email \"$(git config user.email)\" && git config --local user.name \"$(git config user.name)\"",
"overrideCommand": true
}
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ repos:
types_or: [python, pyi]
args: [--fix]
require_serial: true
additional_dependencies: ["ruff==0.7.3"]
additional_dependencies: ["ruff==0.8.0"]
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py|^performance/tests/test_.*.py
- id: ruff-format
name: Run 'ruff format'
Expand All @@ -370,7 +370,7 @@ repos:
types_or: [python, pyi]
args: []
require_serial: true
additional_dependencies: ["ruff==0.7.3"]
additional_dependencies: ["ruff==0.8.0"]
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py$
- id: replace-bad-characters
name: Replace bad characters
Expand Down
84 changes: 81 additions & 3 deletions airflow/api_fastapi/common/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

from typing import TYPE_CHECKING, Literal, Sequence, overload

from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from sqlalchemy.ext.asyncio import AsyncSession

from airflow.utils.db import get_query_count, get_query_count_async
from airflow.utils.session import NEW_SESSION, create_session, create_session_async, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -53,7 +55,9 @@ def your_route(session: Annotated[Session, Depends(get_session)]):


def apply_filters_to_select(
*, base_select: Select, filters: Sequence[BaseParam | None] | None = None
*,
base_select: Select,
filters: Sequence[BaseParam | None] | None = None,
) -> Select:
if filters is None:
return base_select
Expand All @@ -65,6 +69,80 @@ def apply_filters_to_select(
return base_select


async def get_async_session() -> AsyncSession:
"""
Dependency for providing a session.
Example usage:
.. code:: python
@router.get("/your_path")
def your_route(session: Annotated[AsyncSession, Depends(get_async_session)]):
pass
"""
async with create_session_async() as session:
yield session


@overload
async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: Literal[True] = True,
) -> tuple[Select, int]: ...


@overload
async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: Literal[False],
) -> tuple[Select, None]: ...


async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam | None] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: bool = True,
) -> tuple[Select, int | None]:
query = apply_filters_to_select(
base_select=query,
filters=filters,
)

total_entries = None
if return_total_entries:
total_entries = await get_query_count_async(query, session=session)

# TODO: Re-enable when permissions are handled. Readable / writable entities,
# for instance:
# readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user)
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))

query = apply_filters_to_select(
base_select=query,
filters=[order_by, offset, limit],
)

return query, total_entries


@overload
def paginated_select(
*,
Expand Down
29 changes: 29 additions & 0 deletions airflow/api_fastapi/core_api/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel as PydanticBaseModel, ConfigDict


class BaseModel(PydanticBaseModel):
"""
Base pydantic model for REST API.
:meta private:
"""

model_config = ConfigDict(from_attributes=True)
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

from datetime import datetime

from pydantic import BaseModel, Field, field_validator
from pydantic import Field, field_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.log.secrets_masker import redact


Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

from datetime import datetime

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.models.backfill import ReprocessBehavior


Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class ConfigOption(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import json

from pydantic import BaseModel, Field, field_validator
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.log.secrets_masker import redact


Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field
from pydantic import Field

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class DAGSourceResponse(BaseModel):
Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from __future__ import annotations

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.state import DagRunState


Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

from datetime import datetime

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.models.dagwarning import DagWarningType


Expand Down
25 changes: 13 additions & 12 deletions airflow/api_fastapi/core_api/datamodels/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
AliasGenerator,
BaseModel,
ConfigDict,
computed_field,
field_validator,
)

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.configuration import conf
from airflow.serialization.pydantic.dag import DagTagPydantic

Expand Down Expand Up @@ -107,6 +107,17 @@ class DAGCollectionResponse(BaseModel):
class DAGDetailsResponse(DAGResponse):
"""Specific serializer for DAG Details responses."""

model_config = ConfigDict(
from_attributes=True,
alias_generator=AliasGenerator(
validation_alias=lambda field_name: {
"dag_run_timeout": "dagrun_timeout",
"last_parsed": "last_loaded",
"template_search_path": "template_searchpath",
}.get(field_name, field_name),
),
)

catchup: bool
dag_run_timeout: timedelta | None
asset_expression: dict | None
Expand All @@ -120,16 +131,6 @@ class DAGDetailsResponse(DAGResponse):
timezone: str | None
last_parsed: datetime | None

model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=lambda field_name: {
"dag_run_timeout": "dagrun_timeout",
"last_parsed": "last_loaded",
"template_search_path": "template_searchpath",
}.get(field_name, field_name),
)
)

@field_validator("timezone", mode="before")
@classmethod
def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None:
Expand All @@ -144,7 +145,7 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
"""Convert params attribute to dict representation."""
if params is None:
return None
return {param_name: param_val.dump() for param_name, param_val in params.items()}
return {k: v.dump() for k, v in params.items()}

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
Expand Down
8 changes: 5 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel


class EventLogResponse(BaseModel):
"""Event Log Response."""

model_config = ConfigDict(populate_by_name=True, from_attributes=True)

id: int = Field(alias="event_log_id")
dttm: datetime = Field(alias="when")
dag_id: str | None
Expand All @@ -37,8 +41,6 @@ class EventLogResponse(BaseModel):
owner: str | None
extra: str | None

model_config = ConfigDict(populate_by_name=True)


class EventLogCollectionResponse(BaseModel):
"""Event Log Collection Response."""
Expand Down
8 changes: 5 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel


class ImportErrorResponse(BaseModel):
"""Import Error Response."""

model_config = ConfigDict(populate_by_name=True, from_attributes=True)

id: int = Field(alias="import_error_id")
timestamp: datetime
filename: str
stacktrace: str = Field(alias="stack_trace")

model_config = ConfigDict(populate_by_name=True)


class ImportErrorCollectionResponse(BaseModel):
"""Import Error Collection Response."""
Expand Down
4 changes: 1 addition & 3 deletions airflow/api_fastapi/core_api/datamodels/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict
from airflow.api_fastapi.core_api.base import BaseModel


class JobResponse(BaseModel):
"""Job serializer for responses."""

model_config = ConfigDict(populate_by_name=True)

id: int
dag_id: str | None
state: str | None
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class BaseInfoSchema(BaseModel):
Expand Down
Loading

0 comments on commit a9fb949

Please sign in to comment.