Skip to content

Commit 0b8ce3e

Browse files
committed
Add stream orchestration notes
Signed-off-by: Danny Chiao <[email protected]>
1 parent a9f9110 commit 0b8ce3e

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

module_3/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ This is a very similar module to module 1. The key difference is now we'll be us
3333
- [Step 7: Retrieve features + test stream ingestion](#step-7-retrieve-features--test-stream-ingestion)
3434
- [Overview](#overview)
3535
- [Time to run code!](#time-to-run-code)
36+
- [Step 8: Options for orchestrating streaming pipelines](#step-8-options-for-orchestrating-streaming-pipelines)
3637
- [Conclusion](#conclusion)
3738
- [Limitations](#limitations)
3839
- [Why Feast?](#why-feast)
@@ -251,6 +252,24 @@ Feast will help enforce a consistent schema across batch + streaming features as
251252
### Time to run code!
252253
Now, Run [Jupyter notebook](feature_repo/module_3.ipynb)
253254

255+
## Step 8: Options for orchestrating streaming pipelines
256+
We don't showcase how this works, but broadly there are many approaches to this. In all the approaches, you'll likely want to generate operational metrics for monitoring (e.g. via StatsD or Prometheus Pushgateway).
257+
258+
To outline a few approaches:
259+
- **Option 1**: frequently run stream ingestion on a trigger, and then run this in the orchestration tool of choice like Airflow, Databricks Jobs, etc. e.g.
260+
```python
261+
(seven_day_avg
262+
.writeStream
263+
.outputMode("append")
264+
.option("checkpointLocation", "/tmp/feast-workshop/q3/")
265+
.trigger(once=True)
266+
.foreachBatch(send_to_feast)
267+
.start())
268+
```
269+
- **Option 2**: with Databricks, use Databricks Jobs to monitor streaming queries and auto-retry on a new cluster + on failure. See [Databricks docs](https://docs.databricks.com/structured-streaming/query-recovery.html#configure-structured-streaming-jobs-to-restart-streaming-queries-on-failure) for details.
270+
- **Option 3**: with Dataproc, configure [restartable jobs](https://cloud.google.com/dataproc/docs/concepts/jobs/restartable-jobs)
271+
- **Option 4** If you're using Flink, then consider configuring a [restart strategy](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/)
272+
254273
# Conclusion
255274
By the end of this module, you will have learned how to build a full feature platform, with orchestrated batch transformations (using dbt + Airflow), orchestrated materialization (with Feast + Airflow).
256275

@@ -282,4 +301,4 @@ Several things change:
282301
- Production deployment of Airflow (e.g. syncing with a Git repository of DAGs, using k8s)
283302
- Bundling dbt models with Airflow (e.g. via S3 like this [MWAA + dbt guide](https://docs.aws.amazon.com/mwaa/latest/userguide/samples-dbt.html))
284303
- Airflow DAG parallelizes across feature views (instead of running a single `feature_store.materialize` across all feature views)
285-
- Feast materialization is configured to be more scalable (e.g. using other Feast batch materialization engines [Bytewax](https://docs.feast.dev/reference/batch-materialization/bytewax), [Snowflake](https://docs.feast.dev/reference/batch-materialization/snowflake), [Lambda](https://docs.feast.dev/reference/batch-materialization/lambda), [Spark](https://docs.feast.dev/reference/batch-materialization/spark))
304+
- Feast materialization is configured to be more scalable (e.g. using other Feast batch materialization engines [Bytewax](https://docs.feast.dev/reference/batch-materialization/bytewax), [Snowflake](https://docs.feast.dev/reference/batch-materialization/snowflake), [Lambda](https://docs.feast.dev/reference/batch-materialization/lambda), [Spark](https://docs.feast.dev/reference/batch-materialization/spark))

module_3/feature_repo/module_3.ipynb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -533,13 +533,15 @@
533533
" .select(col(\"nameOrig\").alias(\"USER_ID\"), col(\"window.end\").alias(\"TIMESTAMP\"), \"7D_AVG_AMT\")\n",
534534
")\n",
535535
"\n",
536-
"query_1 = seven_day_avg \\\n",
537-
" .writeStream \\\n",
538-
" .outputMode(\"append\") \\\n",
539-
" .option(\"checkpointLocation\", \"/tmp/feast-workshop/q3/\") \\\n",
540-
" .trigger(processingTime=\"30 seconds\") \\\n",
541-
" .foreachBatch(send_to_feast) \\\n",
542-
" .start()\n",
536+
"query_1 = (\n",
537+
" seven_day_avg\n",
538+
" .writeStream\n",
539+
" .outputMode(\"append\") \n",
540+
" .option(\"checkpointLocation\", \"/tmp/feast-workshop/q3/\")\n",
541+
" .trigger(processingTime=\"30 seconds\")\n",
542+
" .foreachBatch(send_to_feast)\n",
543+
" .start()\n",
544+
")\n",
543545
"\n",
544546
"query_1.awaitTermination(timeout=30)\n",
545547
"query_1.stop()"

0 commit comments

Comments
 (0)