Skip to content

Commit

Permalink
Add Documentation for Conditional Dataset Scheduling with dag-factory (
Browse files Browse the repository at this point in the history
…#367)

Closes: #361

This PR adds a comprehensive guide on conditional dataset scheduling
using dag-factory (v0.22.0+) and Airflow 2.9+. Key updates include:

Explanation of conditional dataset scheduling and its use cases.
Requirements for using the feature.
Examples demonstrating configurations with both string and YAML syntax.
Visual diagrams illustrating dataset condition logic.
This documentation is intended to help users understand and implement
conditional dataset scheduling effectively in their workflows.

---------

Co-authored-by: ErickSeo <[email protected]>
  • Loading branch information
ErickSeo and ErickSeo authored Jan 17, 2025
1 parent 98fffa2 commit de032dc
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 41 deletions.
41 changes: 0 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ For a gentle introduction, please take a look at our [Quickstart Guide](https://
- [Features](#features)
- [Dynamically Mapped Tasks](https://astronomer.github.io/dag-factory/latest/features/dynamic_tasks/)
- [Multiple Configuration Files](#multiple-configuration-files)
- [Datasets](#datasets)
- [Callbacks](#callbacks)
- [Custom Operators](#custom-operators)
- [Notes](#notes)
Expand All @@ -50,46 +49,6 @@ If you want to split your DAG configuration into multiple files, you can do so b
load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])
```

### Datasets

**dag-factory** supports scheduling DAGs via [Apache Airflow Datasets](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html).

To leverage, you need to specify the `Dataset` in the `outlets` key in the configuration file. The `outlets` key is a list of strings that represent the dataset locations.
In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations.
The consumer dag will run when all the datasets are available.

```yaml
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:
bash_command: "echo 2"
dependencies: [ task_1 ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
```
![datasets_example.png](img/datasets_example.png)
### Custom Operators

**dag-factory** supports using custom operators. To leverage, set the path to the custom operator within the `operator` key in the configuration file. You can add any additional parameters that the custom operator requires.
Expand Down
27 changes: 27 additions & 0 deletions dev/dags/datasets/example_dag_datasets_outlet.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:
bash_command: "echo 2"
dependencies: [ task_1 ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
12 changes: 12 additions & 0 deletions dev/dags/datasets/example_dataset_condition_string.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule:
datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
17 changes: 17 additions & 0 deletions dev/dags/datasets/example_dataset_yaml_syntax.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule:
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
- "s3://bucket-cjmm/raw/dataset_custom_2"
- "s3://bucket-cjmm/raw/dataset_custom_3"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
59 changes: 59 additions & 0 deletions docs/features/datasets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Datasets
DAG Factory supports Airflow’s [Datasets](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html).

## Datasets Outlets

To leverage, you need to specify the `Dataset` in the `outlets` key in the configuration file. The `outlets` key is a list of strings that represent the dataset locations.
In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations.
The consumer dag will run when all the datasets are available.

#### Example: Outlet

```title="example_dag_datasets_outlet.yml"
--8<-- "dev/dags/datasets/example_dag_datasets_outlet.yml"
```

![datasets_example.png](../static/images/datasets/outlets/datasets_example.png "Simple Dataset Producer")

## Conditional Dataset Scheduling

#### Minimum Requirements:
* dag-factory 0.22.0+
* [Apache Airflow® 2.9+](https://www.astronomer.io/docs/learn/airflow-datasets/#conditional-dataset-scheduling)


#### Logical operators for datasets
Airflow supports two logical operators for combining dataset conditions:

* AND (``&``): Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
* OR (``|``): Specifies that the DAG should be triggered when any of the specified datasets is updated.

These operators enable you to configure your Airflow workflows to use more complex dataset update conditions, making them more dynamic and flexible.

#### Examples of Conditional Dataset Scheduling

Below are examples demonstrating how to configure a consumer DAG using conditional dataset scheduling.

##### Example 1: String Condition

```title="example_dataset_condition_string.yml"
--8<-- "dev/dags/datasets/example_dataset_condition_string.yml"
```

##### Example 2: YAML Syntax

```title="example_dataset_yaml_syntax.yml"
--8<-- "dev/dags/datasets/example_dataset_yaml_syntax.yml"
```

---

#### Visualization

The following diagrams illustrate the dataset conditions described in the example configurations:

1. **`s3://bucket-cjmm/raw/dataset_custom_1`** and **`s3://bucket-cjmm/raw/dataset_custom_2`** must both be updated for the first condition to be satisfied.
2. Alternatively, **`s3://bucket-cjmm/raw/dataset_custom_3`** alone can satisfy the condition.

![Graph Conditional Dataset 1](../static/images/datasets/conditions/graph_conditional_dataset.png)
![Graph Conditional Dataset 2](../static/images/datasets/conditions/graph_conditional_dataset_2.png)
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Are you new to DAG Factory? This is the place to start!
## Features

* [Dynamic tasks](features/dynamic_tasks.md)
* [Datasets scheduling](features/datasets.md)

## Getting help

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ nav:
- configuration/defaults.md
- Features:
- features/dynamic_tasks.md
- features/datasets.md
- Comparison:
- comparison/index.md
- Traditional Airflow Operators: comparison/traditional_operators.md
Expand Down

0 comments on commit de032dc

Please sign in to comment.