Skip to content

Commit 08fb984

Browse files
authored
Merge pull request #719 from atlanhq/APP-8600
APP-8600: Added support for sending raw `OpenLineage` events
2 parents a89da85 + d34e5f7 commit 08fb984

File tree

12 files changed

+551
-39
lines changed

12 files changed

+551
-39
lines changed

pyatlan/client/aio/open_lineage.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Copyright 2025 Atlan Pte. Ltd.
33

4-
from typing import List, Optional
4+
from typing import Any, Dict, List, Optional, Union
55

66
from pydantic.v1 import validate_arguments
77

@@ -13,8 +13,9 @@
1313
)
1414
from pyatlan.errors import AtlanError, ErrorCode
1515
from pyatlan.model.enums import AtlanConnectorType
16-
from pyatlan.model.open_lineage.event import OpenLineageEvent
16+
from pyatlan.model.open_lineage.event import OpenLineageEvent, OpenLineageRawEvent
1717
from pyatlan.model.response import AssetMutationResponse
18+
from pyatlan.utils import validate_type
1819

1920

2021
class AsyncOpenLineageClient:
@@ -68,24 +69,50 @@ async def create_connection(
6869
# Save connection and return response directly
6970
return await self._client.asset.save(connection) # type: ignore[attr-defined]
7071

71-
@validate_arguments
7272
async def send(
73-
self, request: OpenLineageEvent, connector_type: AtlanConnectorType
73+
self,
74+
request: Union[
75+
OpenLineageEvent,
76+
OpenLineageRawEvent,
77+
List[Dict[str, Any]],
78+
Dict[str, Any],
79+
str,
80+
],
81+
connector_type: AtlanConnectorType,
7482
) -> None:
7583
"""
7684
Sends the OpenLineage event to Atlan to be consumed.
7785
78-
:param request: OpenLineage event to send
86+
:param request: OpenLineage event to send - can be an OpenLineageEvent, OpenLineageRawEvent, list of dicts, dict, or JSON string
7987
:param connector_type: of the connection that should receive the OpenLineage event
8088
:raises AtlanError: when OpenLineage is not configured OR on any issues with API communication
8189
"""
90+
validate_type(
91+
name="request",
92+
_type=(OpenLineageEvent, OpenLineageRawEvent, list, dict, str),
93+
value=request,
94+
)
95+
validate_type(
96+
name="connector_type",
97+
_type=(AtlanConnectorType),
98+
value=connector_type,
99+
)
82100
try:
101+
# Convert raw list/dict/str/ of dicts to OpenLineageRawEvent if needed
102+
if isinstance(request, (dict, str, list)):
103+
if isinstance(request, str):
104+
request = OpenLineageRawEvent.parse_raw(request)
105+
else:
106+
# For list or dict, use parse_obj
107+
request = OpenLineageRawEvent.parse_obj(request)
108+
109+
# If it's already an OpenLineageEvent or OpenLineageRawEvent, use as-is
83110
# Prepare request using shared logic
84111
api_endpoint, request_obj, api_options = OpenLineageSend.prepare_request(
85112
request, connector_type
86113
)
87114

88-
# Make async API call
115+
# Make async API call - _call_api handles JSON conversion automatically
89116
await self._client._call_api(
90117
request_obj=request_obj, api=api_endpoint, **api_options
91118
)

pyatlan/client/common/open_lineage.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from pyatlan.model.assets import Connection
1111
from pyatlan.model.credential import Credential
1212
from pyatlan.model.enums import AtlanConnectorType
13-
from pyatlan.model.open_lineage.event import OpenLineageEvent
1413

1514

1615
class OpenLineageCreateCredential:
@@ -91,8 +90,8 @@ class OpenLineageSend:
9190

9291
@staticmethod
9392
def prepare_request(
94-
request: OpenLineageEvent, connector_type: AtlanConnectorType
95-
) -> Tuple[str, OpenLineageEvent, Dict[str, Any]]:
93+
request: Any, connector_type: AtlanConnectorType
94+
) -> Tuple[str, Any, Dict[str, Any]]:
9695
"""Prepare the send event request."""
9796
api_endpoint = OPEN_LINEAGE_SEND_EVENT_API.format_path(
9897
{"connector_type": connector_type.value}

pyatlan/client/open_lineage.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Optional
1+
from typing import Any, Dict, List, Optional, Union
22

33
from pydantic.v1 import validate_arguments
44

@@ -10,8 +10,9 @@
1010
)
1111
from pyatlan.errors import AtlanError, ErrorCode
1212
from pyatlan.model.enums import AtlanConnectorType
13-
from pyatlan.model.open_lineage.event import OpenLineageEvent
13+
from pyatlan.model.open_lineage.event import OpenLineageEvent, OpenLineageRawEvent
1414
from pyatlan.model.response import AssetMutationResponse
15+
from pyatlan.utils import validate_type
1516

1617

1718
class OpenLineageClient:
@@ -65,24 +66,48 @@ def create_connection(
6566
# Save connection and return response directly
6667
return self._client.asset.save(connection) # type: ignore[attr-defined]
6768

68-
@validate_arguments
6969
def send(
70-
self, request: OpenLineageEvent, connector_type: AtlanConnectorType
70+
self,
71+
request: Union[
72+
OpenLineageEvent,
73+
OpenLineageRawEvent,
74+
List[Dict[str, Any]],
75+
Dict[str, Any],
76+
str,
77+
],
78+
connector_type: AtlanConnectorType,
7179
) -> None:
7280
"""
7381
Sends the OpenLineage event to Atlan to be consumed.
7482
75-
:param request: OpenLineage event to send
83+
:param request: OpenLineage event to send - can be an OpenLineageEvent, OpenLineageRawEvent, list of dicts, dict, or JSON string
7684
:param connector_type: of the connection that should receive the OpenLineage event
7785
:raises AtlanError: when OpenLineage is not configured OR on any issues with API communication
7886
"""
87+
validate_type(
88+
name="request",
89+
_type=(OpenLineageEvent, OpenLineageRawEvent, list, dict, str),
90+
value=request,
91+
)
92+
validate_type(
93+
name="connector_type",
94+
_type=(AtlanConnectorType),
95+
value=connector_type,
96+
)
7997
try:
98+
# Convert raw list/dict/str/ of dicts to OpenLineageRawEvent if needed
99+
if isinstance(request, (dict, str, list)):
100+
if isinstance(request, str):
101+
request = OpenLineageRawEvent.parse_raw(request)
102+
else:
103+
# For list or dict, use parse_obj
104+
request = OpenLineageRawEvent.parse_obj(request)
105+
80106
# Prepare request using shared logic
81107
api_endpoint, request_obj, api_options = OpenLineageSend.prepare_request(
82108
request, connector_type
83109
)
84-
85-
# Make API call
110+
# Make API call - _call_api handles JSON conversion automatically
86111
self._client._call_api(
87112
request_obj=request_obj, api=api_endpoint, **api_options
88113
)
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
from .event import OpenLineageEvent
1+
from .event import OpenLineageEvent, OpenLineageRawEvent
22
from .job import OpenLineageJob
33
from .run import OpenLineageRun
44

5-
__all__ = ["OpenLineageEvent", "OpenLineageJob", "OpenLineageRun"]
5+
__all__ = [
6+
"OpenLineageEvent",
7+
"OpenLineageRawEvent",
8+
"OpenLineageJob",
9+
"OpenLineageRun",
10+
]

pyatlan/model/open_lineage/event.py

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from __future__ import annotations
22

33
from datetime import datetime
4-
from typing import TYPE_CHECKING, List, Optional
4+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
55

66
from pydantic.v1 import Field, root_validator
77
from pytz import utc # type:ignore[import-untyped]
88

9+
from pyatlan.model.core import AtlanObject
910
from pyatlan.model.enums import AtlanConnectorType, OpenLineageEventType
1011
from pyatlan.model.open_lineage.base import OpenLineageBaseEvent
1112
from pyatlan.model.open_lineage.input_dataset import OpenLineageInputDataset
@@ -14,9 +15,45 @@
1415
from pyatlan.model.open_lineage.run import OpenLineageRun
1516

1617
if TYPE_CHECKING:
18+
from pyatlan.client.aio.client import AsyncAtlanClient
1719
from pyatlan.client.atlan import AtlanClient
1820

1921

22+
class OpenLineageRawEvent(AtlanObject):
23+
"""
24+
Root model for handling raw OpenLineage events.
25+
26+
This model accepts any arbitrary data structure (dict, list of dicts, string, etc.) and allows
27+
it to be sent as raw OpenLineage event data to Atlan's API.
28+
29+
Use the built-in pydantic methods:
30+
- OpenLineageRawEvent.parse_raw(json_string)
31+
- OpenLineageRawEvent.parse_obj(any_data)
32+
"""
33+
34+
__root__: Union[List[Dict[str, Any]], Dict[str, Any], str, Any]
35+
36+
@classmethod
37+
def from_json(cls, json_str: str) -> OpenLineageRawEvent:
38+
"""
39+
Create an OpenLineageRawEvent from a JSON string.
40+
41+
:param json_str: JSON string containing raw OpenLineage event data
42+
:returns: New OpenLineageRawEvent instance
43+
"""
44+
return cls.parse_raw(json_str)
45+
46+
@classmethod
47+
def from_dict(cls, data: Dict[str, Any]) -> OpenLineageRawEvent:
48+
"""
49+
Create an OpenLineageRawEvent from a dictionary.
50+
51+
:param data: Dictionary containing raw OpenLineage event data
52+
:returns: New OpenLineageRawEvent instance
53+
"""
54+
return cls.parse_obj(data)
55+
56+
2057
class OpenLineageEvent(OpenLineageBaseEvent):
2158
"""
2259
Atlan wrapper for abstracting OpenLineage events.
@@ -81,3 +118,50 @@ def emit(self, client: AtlanClient) -> None:
81118
return client.open_lineage.send(
82119
request=self, connector_type=AtlanConnectorType.SPARK
83120
)
121+
122+
async def emit_async(self, client: AsyncAtlanClient) -> None:
123+
"""
124+
Send the OpenLineage event to Atlan to be processed (async version).
125+
126+
:param client: async connectivity to an Atlan tenant
127+
:raises AtlanError: on any API communication issues
128+
"""
129+
return await client.open_lineage.send(
130+
request=self, connector_type=AtlanConnectorType.SPARK
131+
)
132+
133+
@classmethod
134+
def emit_raw(
135+
cls,
136+
client: AtlanClient,
137+
event: Union[OpenLineageRawEvent, List[Dict[str, Any]], Dict[str, Any], str],
138+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
139+
) -> None:
140+
"""
141+
Send raw OpenLineage event data to Atlan to be processed.
142+
143+
:param client: connectivity to an Atlan tenant
144+
:param event: Raw event(s) as JSON string, dict, list of dicts, or OpenLineageRawEvent
145+
:param connector_type: connector type for the OpenLineage event
146+
:raises AtlanError: on any API communication issues
147+
"""
148+
return client.open_lineage.send(request=event, connector_type=connector_type)
149+
150+
@classmethod
151+
async def emit_raw_async(
152+
cls,
153+
client: AsyncAtlanClient,
154+
event: Union[OpenLineageRawEvent, List[Dict[str, Any]], Dict[str, Any], str],
155+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
156+
) -> None:
157+
"""
158+
Send raw OpenLineage event data to Atlan to be processed (async version).
159+
160+
:param client: async connectivity to an Atlan tenant
161+
:param event: Raw event(s) as JSON string, dict, list of dicts, or OpenLineageRawEvent
162+
:param connector_type: connector type for the OpenLineage event
163+
:raises AtlanError: on any API communication issues
164+
"""
165+
return await client.open_lineage.send(
166+
request=event, connector_type=connector_type
167+
)

tests/integration/aio/test_open_lineage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def test_open_lineage_integration(
6666
od,
6767
job.create_output(namespace=namespace, asset_name="AN.OTHER.VIEW"),
6868
]
69-
await start.emit(client=client) # type: ignore[func-returns-value]
69+
await start.emit_async(client=client)
7070

7171
complete = OpenLineageEvent.creator(
7272
run=run, event_type=OpenLineageEventType.COMPLETE
@@ -80,4 +80,4 @@ async def test_open_lineage_integration(
8080
od,
8181
job.create_output(namespace=namespace, asset_name="AN.OTHER.VIEW"),
8282
]
83-
await complete.emit(client=client) # type: ignore[func-returns-value]
83+
await complete.emit_async(client=client)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"eventTime": "2025-01-04T21:36:06.116Z",
3+
"eventType": "START",
4+
"run": {
5+
"runId": "minimal-run-id"
6+
},
7+
"job": {
8+
"namespace": "default",
9+
"name": "minimal_job"
10+
}
11+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[
2+
{
3+
"eventTime": "2025-01-04T21:35:06.116Z",
4+
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
5+
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
6+
"eventType": "START",
7+
"run": {
8+
"runId": "event-1-run-id",
9+
"facets": {}
10+
},
11+
"job": {
12+
"namespace": "default",
13+
"name": "job_1"
14+
}
15+
},
16+
{
17+
"eventTime": "2025-01-04T21:36:06.116Z",
18+
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
19+
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
20+
"eventType": "COMPLETE",
21+
"run": {
22+
"runId": "event-2-run-id",
23+
"facets": {}
24+
},
25+
"job": {
26+
"namespace": "default",
27+
"name": "job_2"
28+
}
29+
},
30+
{
31+
"eventTime": "2025-01-04T21:37:06.116Z",
32+
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
33+
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
34+
"eventType": "FAIL",
35+
"run": {
36+
"runId": "event-3-run-id",
37+
"facets": {}
38+
},
39+
"job": {
40+
"namespace": "default",
41+
"name": "job_3"
42+
}
43+
}
44+
]

0 commit comments

Comments
 (0)