Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created new exception, that provides details of each step of a job ru… #48466

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

geraj1010
Copy link
Contributor

@geraj1010 geraj1010 commented Mar 27, 2025


closes: #46923

Created new exception DbtCloudJobRunDetailsException, which surfaces the job run logs from dbt Cloud, whenever a job run fails or is cancelled.

Side-Note: I did notice that when the operator is deferred, it finishes pre-maturely (i.e. the trigger fires an event before the job is in a terminal status). This wasn't an issue with the operator code in branch v2-10-stable, but is a current issue in main. I can try to sneak the change in here, or open a new issue to resolve.

Current Task Log Output
... File "/opt/airflow/airflow/providers/dbt/cloud/operators/dbt.py", line 182, in execute raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.") airflow.providers.dbt.cloud.hooks.dbt.DbtCloudJobRunException: Job run 70471831332476 has failed or has been cancelled.

New Task Log Output

... 
File "/opt/airflow/airflow/providers/dbt/cloud/operators/dbt.py", line 187, in execute raise DbtCloudJobRunDetailsException(self.run_id, run_details) airflow.providers.dbt.cloud.hooks.dbt.DbtCloudJobRunDetailsException: Job run 70471831584850 has failed or has been cancelled. Job run details: {"id": 70471831584850, "job_id": 70471823442042, "is_error": true, "dbt_version": "latest", "finished_at": "2025-03-27 20:37:19.996618+00:00", "finished_at_humanized": "30 seconds ago", "run_duration": "00:00:28", "run_duration_humanized": "28 seconds"} Run step details: {"run_id": 70471831584850, "id": 70471865832299, "index": 1, "name": "Clone git repository", "status": 10, "status_humanized": "Success", "duration": "00:00:00", "duration_humanized": "0 minutes", "logs": "Cloning into '/tmp/jobs/70471831584850/target'...\nSuccessfully cloned repository.\nChecking out default branch\n", "debug_logs": ""}{"run_id": 70471831584850, "id": 70471865832302, "index": 2, "name": "Create profile from connection BigQuery", "status": 10, "status_humanized": "Success", "duration": "00:00:00", "duration_humanized": "0 minutes", "logs": "Create profile from connection BigQuery", "debug_logs": ""}{"run_id": 70471831584850, "id": 70471865832303, "index": 3, "name": "Invoke dbt with ``dbt deps``", "status": 10, "status_humanized": "Success", "duration": "00:00:02", "duration_humanized": "2 seconds", "logs": "20:36:59  Running dbt...\n20:36:59  Warning: No packages were found in packages.yml\n20:36:59  Warning: No packages were found in packages.yml\n", "debug_logs": ""}{"run_id": 70471831584850, "id": 70471865832306, "index": 4, "name": "Invoke dbt with dbt build", "status": 20, "status_humanized": "Error", "duration": "00:00:14", "duration_humanized": "14 seconds", "logs": "dbt command failed20:37:01  Running dbt...\n20:37:03  Found 2 models, 4 data tests, 492 macros\n20:37:03  \n20:37:03\n20:37:03  Concurrency: 4 threads (target='default')\n20:37:03  \n20:37:03  1 of 6 START sql table model dbt.my_first_dbt_model ..................... [RUN]\n20:37:07  1 of 6 OK created sql table model dbt.my_first_dbt_model ................ [\u001b[32mCREATE TABLE (2.0 rows, 0 processed)\u001b[0m in 3.35s]\n20:37:07  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]\n20:37:07  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]\n20:37:08  2 of 6 FAIL 1 not_null_my_first_dbt_model_id ................................... [\u001b[31mFAIL 1\u001b[0m in 1.17s]\n20:37:08  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [\u001b[32mPASS\u001b[0m in 1.31s]\n20:37:08  4 of 6 SKIP relation dbt.my_second_dbt_model ............................ [\u001b[33mSKIP\u001b[0m]\n20:37:08  5 of 6 SKIP test not_null_my_second_dbt_model_id ............................... [\u001b[33mSKIP\u001b[0m]\n20:37:08  6 of 6 SKIP test unique_my_second_dbt_model_id ................................. [\u001b[33mSKIP\u001b[0m]\n20:37:08  \n20:37:08\n20:37:08  Finished running 1 table model, 4 data tests, 1 view model in 0 hours 0 minutes and 5.26 seconds (5.26s).\n20:37:08  \n20:37:08\n20:37:08  \u001b[31mCompleted with 1 error, 0 partial successes, and 0 warnings:\u001b[0m\n20:37:08  \n20:37:08\n20:37:08  \u001b[31mFailure in test not_null_my_first_dbt_model_id (models/example/schema.yml)\u001b[0m\n20:37:08\n20:37:08    Got 1 result, configured to fail if != 0\n20:37:08  \n20:37:08\n20:37:08    compiled code at target/compiled/airflow_dbt_providers_test/models/example/schema.yml/not_null_my_first_dbt_model_id.sql\n20:37:08  \n20:37:08\n20:37:08  Done. PASS=2 WARN=0 ERROR=1 SKIP=3 NO-OP=0 TOTAL=6\n", "debug_logs": ""} 

EDIT: Used chatgpt to format the above traceback:

File "/opt/airflow/airflow/providers/dbt/cloud/operators/dbt.py", line 187, in execute
    raise DbtCloudJobRunDetailsException(self.run_id, run_details)
airflow.providers.dbt.cloud.hooks.dbt.DbtCloudJobRunDetailsException: 
    Job run 70471831584850 has failed or has been cancelled. 
    Job run details: 
    {
        "id": 70471831584850, 
        "job_id": 70471823442042, 
        "is_error": true, 
        "dbt_version": "latest", 
        "finished_at": "2025-03-27 20:37:19.996618+00:00", 
        "finished_at_humanized": "30 seconds ago", 
        "run_duration": "00:00:28", 
        "run_duration_humanized": "28 seconds"
    }

Run step details: 
{
    "run_id": 70471831584850, 
    "id": 70471865832299, 
    "index": 1, 
    "name": "Clone git repository", 
    "status": 10, 
    "status_humanized": "Success", 
    "duration": "00:00:00", 
    "duration_humanized": "0 minutes", 
    "logs": "Cloning into '/tmp/jobs/70471831584850/target'...\nSuccessfully cloned repository.\nChecking out default branch\n", 
    "debug_logs": ""
}

{
    "run_id": 70471831584850, 
    "id": 70471865832302, 
    "index": 2, 
    "name": "Create profile from connection BigQuery", 
    "status": 10, 
    "status_humanized": "Success", 
    "duration": "00:00:00", 
    "duration_humanized": "0 minutes", 
    "logs": "Create profile from connection BigQuery", 
    "debug_logs": ""
}

{
    "run_id": 70471831584850, 
    "id": 70471865832303, 
    "index": 3, 
    "name": "Invoke dbt with ``dbt deps``", 
    "status": 10, 
    "status_humanized": "Success", 
    "duration": "00:00:02", 
    "duration_humanized": "2 seconds", 
    "logs": "20:36:59  Running dbt...\n20:36:59  Warning: No packages were found in packages.yml\n20:36:59  Warning: No packages were found in packages.yml\n", 
    "debug_logs": ""
}

{
    "run_id": 70471831584850, 
    "id": 70471865832306, 
    "index": 4, 
    "name": "Invoke dbt with dbt build", 
    "status": 20, 
    "status_humanized": "Error", 
    "duration": "00:00:14", 
    "duration_humanized": "14 seconds", 
    "logs": "dbt command failed\n20:37:01  Running dbt...\n20:37:03  Found 2 models, 4 data tests, 492 macros\n20:37:03  \n20:37:03\n20:37:03  Concurrency: 4 threads (target='default')\n20:37:03  \n20:37:03  1 of 6 START sql table model dbt.my_first_dbt_model ..................... [RUN]\n20:37:07  1 of 6 OK created sql table model dbt.my_first_dbt_model ................ [\u001b[32mCREATE TABLE (2.0 rows, 0 processed)\u001b[0m in 3.35s]\n20:37:07  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]\n20:37:07  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]\n20:37:08  2 of 6 FAIL 1 not_null_my_first_dbt_model_id ................................... [\u001b[31mFAIL 1\u001b[0m in 1.17s]\n20:37:08  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [\u001b[32mPASS\u001b[0m in 1.31s]\n20:37:08  4 of 6 SKIP relation dbt.my_second_dbt_model ............................ [\u001b[33mSKIP\u001b[0m]\n20:37:08  5 of 6 SKIP test not_null_my_second_dbt_model_id ............................... [\u001b[33mSKIP\u001b[0m]\n20:37:08  6 of 6 SKIP test unique_my_second_dbt_model_id ................................. [\u001b[33mSKIP\u001b[0m]\n20:37:08  \n20:37:08\n20:37:08  Finished running 1 table model, 4 data tests, 1 view model in 0 hours 0 minutes and 5.26 seconds (5.26s).\n20:37:08  \n20:37:08\n20:37:08  \u001b[31mCompleted with 1 error, 0 partial successes, and 0 warnings:\u001b[0m\n20:37:08  \n20:37:08\n20:37:08  \u001b[31mFailure in test not_null_my_first_dbt_model_id (models/example/schema.yml)\u001b[0m\n20:37:08\n20:37:08    Got 1 result, configured to fail if != 0\n20:37:08  \n20:37:08\n20:37:08    compiled code at target/compiled/airflow_dbt_providers_test/models/example/schema.yml/not_null_my_first_dbt_model_id.sql\n20:37:08  \n20:37:08\n20:37:08  Done. PASS=2 WARN=0 ERROR=1 SKIP=3 NO-OP=0 TOTAL=6\n", 
    "debug_logs": ""
}

…n in the logs when a job fails (apache#46923)
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. I left some minor suggestions for modularization.

By the way, is the traceback linked in the PR copied from the local log files? Just want to confirm that the logging display will render correctly.

Comment on lines -216 to +221
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
# Gather job run details (including run steps), so we can output the logs from each step
run_details = self.hook.get_job_run(
run_id=self.run_id, account_id=self.account_id, include_related=["run_steps"]
).json()["data"]

raise DbtCloudJobRunDetailsException(self.run_id, run_details)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can modularize this part as _raise_dbt_job_run_detail_exception method.

Comment on lines -242 to +253
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
# Gather job run details (including run steps), so we can output the logs from each step
run_details = self.hook.get_job_run(
run_id=self.run_id, account_id=self.account_id, include_related=["run_steps"]
).json()["data"]

raise DbtCloudJobRunDetailsException(self.run_id, run_details)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the method can be reused here and several parts below.

Comment on lines +150 to +175
# For extracting only a subset of the higher level job details
# id is the run_id
run_data_keys = [
"id",
"job_id",
"is_error",
"dbt_version",
"finished_at",
"finished_at_humanized",
"run_duration",
"run_duration_humanized",
]
# For extracting only a subset of the run step details
# id is the step id
run_steps_keys = [
"run_id",
"id",
"index",
"name",
"status",
"status_humanized",
"duration",
"duration_humanized",
"logs",
"debug_logs",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: How about make these list for filtering keys as properties of the exception class?

@geraj1010
Copy link
Contributor Author

@jason810496 Thank you for the review. I'm AFK until next Tuesday. I will address your comments then :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Surfacing of DBT Cloud Logs to the logs of the DBTCloudRunJobOperator
2 participants