Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of BaseOperatorLink for cloud run jobs to expose the GCP cloud logging url #46911

Open
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

ramonvermeulen
Copy link

@ramonvermeulen ramonvermeulen commented Feb 19, 2025

New implementation (with extra_links) example:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the provider:google Google (including GCP) related issues label Feb 19, 2025
Copy link

boring-cyborg bot commented Feb 19, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@ramonvermeulen ramonvermeulen force-pushed the feature/ramon/36963-cloud-run-jobs-operator-log-url branch from b2d6790 to 1295e5b Compare February 19, 2025 20:22
@ramonvermeulen ramonvermeulen force-pushed the feature/ramon/36963-cloud-run-jobs-operator-log-url branch from 626d1dd to 9685f0b Compare February 24, 2025 07:46
@ramonvermeulen ramonvermeulen force-pushed the feature/ramon/36963-cloud-run-jobs-operator-log-url branch from 9685f0b to 5c07ae6 Compare February 24, 2025 07:46
@ramonvermeulen ramonvermeulen marked this pull request as ready for review February 24, 2025 07:51
@ramonvermeulen ramonvermeulen changed the title feat: draft implementation of exposing the GCP cloud logging url feat: implementation of exposing the GCP cloud logging url Feb 24, 2025
@ramonvermeulen
Copy link
Author

ramonvermeulen commented Feb 24, 2025

@potiuk @amoghrajesh @sunank200 @eladkal

Since I see that most of you are involved in decision making and recent contributions to the google-provider, and this is my first time contributing to airflow, I would love to hear your view/opinion on this feature.

… github.com:ramonvermeulen/airflow into feature/ramon/36963-cloud-run-jobs-operator-log-url
@ramonvermeulen ramonvermeulen force-pushed the feature/ramon/36963-cloud-run-jobs-operator-log-url branch from 5d17b3d to fffcc52 Compare February 25, 2025 07:04
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than just putting this URL in the log message, please look at the "OperatorExtraLinks" concept -- you can make this show up as a button in the Airflow UI.

(They way I would suggest wiring that up is to have the operator push the url to XCom, and the OperatorLink can then pull the URL out of xcom)

@ramonvermeulen
Copy link
Author

Rather than just putting this URL in the log message, please look at the "OperatorExtraLinks" concept -- you can make this show up as a button in the Airflow UI.

(They way I would suggest wiring that up is to have the operator push the url to XCom, and the OperatorLink can then pull the URL out of xcom)

Thanks, wasn't aware of that functionality. Will see if I can change the implementation after the weekend to make use of OperatorExtraLinks 👍

@ramonvermeulen
Copy link
Author

ramonvermeulen commented Mar 17, 2025

@ashb

Since I am back from holidays, I tried to do the implementation via OperatorExtraLinks. However, I have a hard time testing the set-up. I recently pulled in the latest changes from the upstream airflow repository, and when I run breeze start-airflow it uses Airflow 3 now.

With Airflow 3, I get the following error on the base class I am using:

Direct database access via the ORM is not allowed in Airflow 3.0

Line causing the error:

conf = XCom.get_value(key=self.key, ti_key=ti_key)

I am using the BaseGoogleLink and tried to follow the same pattern as the other extra link implementations in the google provider.

If this is something that needs to be solved for Airflow 3 for all the Google Provider Links anyways, it would be nice if I can add that to the PR. However, since this is my first PR I do not have a great understanding of the airflow codebase (yet). Not sure what should be used instead of the ORM in Airflow 3> to access the XCom from within a BaseOperatorLink?

Full error output:

error_detail=[{"exc_type":"RuntimeError","exc_value":"Direct database access via the ORM is not allowed in Airflow 3.0","exc_notes":[],"syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":831,"name":"main"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":800,"name":"finalize"},{"filename":"/opt/airflow/providers/google/src/airflow/providers/google/cloud/links/base.py","lineno":54,"name":"get_link"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":100,"name":"wrapper"},{"filename":"/usr/local/lib/python3.9/contextlib.py","lineno":119,"name":"__enter__"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":40,"name":"create_session"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py","lineno":208,"name":"__init__"}]}]

@ramonvermeulen ramonvermeulen changed the title feat: implementation of exposing the GCP cloud logging url feat: implementation of BaseOperatorLink for cloud run jobs to expose the GCP cloud logging url Mar 17, 2025
@ramonvermeulen
Copy link
Author

ramonvermeulen commented Mar 18, 2025

Managed to get rid of the above error, and again pulled in the latest upstream.

However, I notice there is an error when the web app tries to fetch the extra link and I have the feeling it might be related to recent changes:

in

fe5a2ea

Line causing the error

return json.loads(result.value, cls=XComDecoder)

Stacktrace
Traceback (most recent call last):                                                                                                                                                                                                 
  File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi                                                                                                                    
    result = await app(  # type: ignore[func-returns-value]                                                                                                                                                                        
  File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__                                                                                                                          
    return await self.app(scope, receive, send)                                                                                                                                                                                    
  File "/usr/local/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__                                                                                                                                    
    await super().__call__(scope, receive, send)                                                                                                                                                                                   
  File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 112, in __call__                                                                                                                                   
    await self.middleware_stack(scope, receive, send)                                                                                                                                                                              
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 187, in __call__                                                                                                                              
    raise exc                                                                                                                                                                                                                      
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 165, in __call__                                                                                                                              
    await self.app(scope, receive, _send)                                                                                                                                                                                          
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/gzip.py", line 29, in __call__                                                                                                                                 
    await responder(scope, receive, send)                                                                                                                                                                                          
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/gzip.py", line 126, in __call__                                                                                                                                
    await super().__call__(scope, receive, send)                                                                                                                                                                                   
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/gzip.py", line 46, in __call__                                                                                                                                 
    await self.app(scope, receive, self.send_with_compression)                                                                                                                                                                     
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/cors.py", line 85, in __call__                                                                                                                                 
    await self.app(scope, receive, send)                                                                                                                                                                                           
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/base.py", line 178, in __call__                                                                                                                                
    recv_stream.close()                                                                                                                                                                                                            
 File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__                                                                                                                                                             
    self.gen.throw(typ, value, traceback)                                                                                                                                                                                          
  File "/usr/local/lib/python3.9/site-packages/starlette/_utils.py", line 82, in collapse_excgroups                                                                                                                                
    raise exc                                                                                                                                                                                                                      
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/base.py", line 175, in __call__                                                                                                                                
    response = await self.dispatch_func(request, call_next)                                                                                                                                                                        
  File "/opt/airflow/airflow/api_fastapi/core_api/middleware.py", line 28, in dispatch                                                                                                                                             
    response = await call_next(request)                                                                                                                                                                                            
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/base.py", line 153, in call_next                                                                                                                               
    raise app_exc                                                                                                                                                                                                                  
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/base.py", line 140, in coro                                                                                                                                    
    await self.app(scope, receive_or_disconnect, send_no_error)                                                                                                                                                                    
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__                                                                                                                           
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)                                                                                                                                                       
  File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app                                                                                                                           
    raise exc                                                                                                                                                                                                                      
  File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app                                                                                                                           
    await app(scope, receive, sender)                                                                                                                                                                                              
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 714, in __call__                                                                                                                                        
    await self.middleware_stack(scope, receive, send)                                                                                                                                                                              
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 734, in app                                                                                                                                             
    await route.handle(scope, receive, send)                                                                                                                                                                                       
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 288, in handle                                                                                                                                          
    await self.app(scope, receive, send)                                                                                                                                                                                           
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 76, in app                                                                                                                                              
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 73, in app
    response = await f(request)
  File "/usr/local/lib/python3.9/site-packages/fastapi/routing.py", line 301, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.9/site-packages/fastapi/routing.py", line 214, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/local/lib/python3.9/site-packages/starlette/concurrency.py", line 37, in run_in_threadpool
    return await anyio.to_thread.run_sync(func)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
  File "/opt/airflow/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 85, in get_extra_links
    all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
  File "/opt/airflow/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 83, in <genexpr>
    (link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1220, in get_extra_links
    return link.get_link(self.unmap(None), ti_key=ti.key)
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 2036, in get_link
    return XComModel.deserialize_value(value)
  File "/opt/airflow/airflow/models/xcom.py", line 338, in deserialize_value
    return json.loads(result.value, cls=XComDecoder)
  File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads
    return cls(**kw).decode(s)
  File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Error has to do with the f"_link_{self.__class__.__name__}" being retrieved from XCom, however not being json serializable (e.g. it is just a string with the URI and not valid json). I tested existing google operators on the latest main such as BigQueryCreateEmptyTableOperator with BigQueryTableLink and I get exactly the same error, so I have the feeling it is not related to my specific link implementation.

To give an example, this is how the XCom looks like for the CloudRunExecuteJobOperator (the new implementation in this PR):
1

And for the BigQueryCreateTableOperator that I tested:
2

Both seem to contain a valid link, however the API server throws a 500 with the jsondecode error (a plain URL in XCom is not valid json). This happens when you open the details page of a task instance (e.g. react front-end fetching the API on this specific TaskInstance list extra links route).

@ramonvermeulen
Copy link
Author

This #47961
Should resolve my issue, will pull in upstream and test this evening :)

@ramonvermeulen
Copy link
Author

Just tested, and I can confirm the fix solved my issue :)

image

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good and I am happy to approve once you have added the test I requested

@ramonvermeulen
Copy link
Author

ramonvermeulen commented Mar 21, 2025

Code looks good and I am happy to approve once you have added the test I requested

Just updated the PR with a test for get_link, it works slightly different than the example you provided because this link comes directly from the API (logUri key), and is not constructed with a format/template string like most of the extra link implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants