Skip to content

Commit

Permalink
Livy batch plugin
Browse files Browse the repository at this point in the history
add verification options: Spark, YARN. Share run configs through VCS
  • Loading branch information
Vadim Panov committed Feb 18, 2020
1 parent 09cc911 commit 422d01f
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 23 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ __pycache__/
*.py[cod]
*$py.class

# PyCharm and VirtualEnv files
.idea/
# VirtualEnv files
venv/

# OS-specific rubbish
Expand All @@ -20,3 +19,7 @@ airflow_home/*
# Those get created when you run Pyspark scripts locally
metastore_db/
derby.log

# Ignore all Idea project files but Run profiles
.idea/*
!.idea/runConfigurations/

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions .idea/runConfigurations/debug_03_batch_example_py.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions .idea/runConfigurations/run_batch_join_2_files.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
help:
@echo "clean - remove Python and Airflow debris"
@echo "clean - remove Python and Airflow debris TODO UPDATE"
@echo "venv - install venv with all requirements for Airflow"
@echo "up - bring up Airflow Docker compose infrastructure"
@echo "down - tear down Airflow Docker compose infrastructure"
Expand Down Expand Up @@ -43,6 +43,8 @@ init: venv
--conn_host localhost --conn_schema http --conn_port 8998 ; \
airflow connections -a --conn_id spark --conn_type HTTP \
--conn_host localhost --conn_schema http --conn_port 18080 ; \
airflow connections -a --conn_id yarn --conn_type HTTP \
--conn_host localhost --conn_schema http --conn_port 8088 ; \
deactivate ;

up:
Expand All @@ -58,5 +60,4 @@ down:
pkill -f airflow | true

copy-batches:
rsync -arv --exclude=".DS_Store" --exclude="desktop.ini" \
./batches ~/data/vpanov/bigdata-docker-compose/data/
rsync -arv ./batches/*.py ~/data/vpanov/bigdata-docker-compose/data/batches/
5 changes: 4 additions & 1 deletion airflow_home/dags/01_session_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Import statement for Airflow when it loads new operators into airflow.operators
from airflow.operators import LivySessionOperator
except ImportError:
# Import statement for IDE with the local folder structure
# Import statement for IDE with the local folder structur
from airflow_home.plugins.livy_session_plugin import LivySessionOperator

dag = DAG(
Expand All @@ -17,6 +17,9 @@
catchup=False,
)


# See ready statements with parameter values substituted
# in the "Rendered template" tab of a running task.
scala_code = """
spark.range(1000 * 1000 * {{ params.your_number }}).count()
val df = Seq(
Expand Down
8 changes: 7 additions & 1 deletion airflow_home/dags/02_session_example_load_from_file.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""
Sessions are okay unless you have to write more than one-liners.
That's where things get ugly - they can't be included as string literals due to
their size, so we store them in a file. Jinja templates make it hard/impossible
to run, debug and test it.
"""
import logging
import os
from datetime import datetime
Expand Down Expand Up @@ -29,7 +35,7 @@ def read_code_from_file(task_instance, **context):

dag = DAG(
"02_session_example_load_from_file",
description="Running Spark jobs via Livy Sessions, loading statenent from file",
description="Running Spark jobs via Livy Sessions, loading statement from file",
schedule_interval=None,
start_date=datetime(1970, 1, 1),
catchup=False,
Expand Down
48 changes: 48 additions & 0 deletions airflow_home/dags/04_batch_example_failing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
This is the DAG that will show you one interesting specific of how Livy works
in local mode vs in YARN mode (job running in cluster mode).
This DAG runs an intentionally failing Livy batch.
TODO doc
"""
from datetime import datetime

from airflow import DAG

try:
# Import statement for Airflow when it loads new operators into airflow.operators
from airflow.operators import LivyBatchOperator
except ImportError:
# Import statement for IDE with the local folder structure
from airflow_home.plugins.livy_batch_plugin import LivyBatchOperator

dag = DAG(
"04_batch_example_failing",
description="Running Spark jobs via Livy Batches, intentionally failing the job",
schedule_interval=None,
start_date=datetime(1970, 1, 1),
catchup=False,
)

t1 = LivyBatchOperator(
name="batch_example_failing_{{ run_id }}",
file="file:///data/batches/join_2_files.py",
py_files=["file:///data/batches/join_2_files.py"],
arguments=[
"file:///data/grades.csv",
"file:///data/ssn-address.tsv",
"-file1_sep=,",
"-file1_header=true",
"-file1_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING",
"-file1_join_column=SSN",
"-file2_header=false",
"-file2_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Address1 STRING, Address2 STRING",
"-file2_join_column=SSN",
"-output_header=true",
"-output_columns=file1.Inexistent",
],
conf={"spark.submit.deployMode": "cluster"},
task_id="livy_batch_example_failing",
dag=dag,
)
45 changes: 45 additions & 0 deletions airflow_home/dags/05_batch_example_verify_in_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import datetime

from airflow import DAG

try:
# Import statement for Airflow when it loads new operators into airflow.operators
from airflow.operators import LivyBatchOperator
except ImportError:
# Import statement for IDE with the local folder structure
from airflow_home.plugins.livy_batch_plugin import LivyBatchOperator

dag = DAG(
"05_batch_example_verify_in_spark",
description="Running Spark job via Livy Batches "
"+ verifying status in Spark REST API",
schedule_interval=None,
start_date=datetime(1970, 1, 1),
catchup=False,
)

t1 = LivyBatchOperator(
name="batch_example_verify_in_spark_{{ run_id }}",
file="file:///data/batches/join_2_files.py",
py_files=["file:///data/batches/join_2_files.py"],
arguments=[
"file:///data/grades.csv",
"file:///data/ssn-address.tsv",
"-file1_sep=,",
"-file1_header=true",
"-file1_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING",
"-file1_join_column=SSN",
"-file2_header=false",
"-file2_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Address1 STRING, Address2 STRING",
"-file2_join_column=SSN",
"-output_header=true",
"-output_columns=file1.`Last name`, file1.`First name`, file1.SSN, "
"file2.Address1, file2.Address2",
## TODO MAKE IT FAIL
],
verify_in="spark",
task_id="livy_batch_example_verify_in_spark",
dag=dag,
)
45 changes: 45 additions & 0 deletions airflow_home/dags/06_batch_example_verify_in_yarn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import datetime

from airflow import DAG

try:
# Import statement for Airflow when it loads new operators into airflow.operators
from airflow.operators import LivyBatchOperator
except ImportError:
# Import statement for IDE with the local folder structure
from airflow_home.plugins.livy_batch_plugin import LivyBatchOperator

dag = DAG(
"06_batch_example_verify_in_yarn",
description="Running Spark job via Livy Batches + "
"verifying job status in YARN Resource Manager REST API",
schedule_interval=None,
start_date=datetime(1970, 1, 1),
catchup=False,
)

t1 = LivyBatchOperator(
name="batch_example_verify_in_yarn_{{ run_id }}",
file="file:///data/batches/join_2_files.py",
py_files=["file:///data/batches/join_2_files.py"],
arguments=[
"file:///data/grades.csv",
"file:///data/ssn-address.tsv",
"-file1_sep=,",
"-file1_header=true",
"-file1_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING",
"-file1_join_column=SSN",
"-file2_header=false",
"-file2_schema=`Last name` STRING, `First name` STRING, SSN STRING, "
"Address1 STRING, Address2 STRING",
"-file2_join_column=SSN",
"-output_header=true",
"-output_columns=file1.`Last name`, file1.`First name`, file1.SSN, "
"file2.Address1, file2.Address2",
## TODO MAKE IT FAIL
],
task_id="livy_batch_example_verify_in_yarn",
verify_in="yarn",
dag=dag,
)
Loading

0 comments on commit 422d01f

Please sign in to comment.