Skip to content

Running spark jobs on AWS EMR with ML_DSL

Anna Safonova edited this page Jun 29, 2020 · 1 revision

There are two ways to run spark jobs on an EMR cluster: using API or jupyter magic functions.

Using API

from com.griddynamics.dsl.ml.executors.executors import EmrExecutor
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.jobs.builder import JobBuilder
from com.griddynamics.dsl.ml.sessions import SessionFactory
from com.griddynamics.dsl.ml.settings.description import Platform

define Profile for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=False)

name of main python script

script_name = 'test_job.py'

additional files for spark job

profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"spark.executor.cores":"1",       
                    "spark.executor.memory":"4G"}

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}
action = 'CONTINUE'

Job Builder instance

builder = JobBuilder(Platform.AWS)
job = (builder.job_id(profile.job_prefix)
              .job_file(script_name)
              .build_job(profile, Platform.AWS))

instance of Session class for emr client

session = SessionFactory(Platform.AWS)
                        .build_session(job_bucket=profile.bucket,
                                       job_region=profile.region,
                                       cluster=profile.cluster,
                                       job_project_id=profile.project)

Executor instance for submitting job to Emr cluster

executor = EmrExecutor(job, session)
executor.submit_job(run_async=profile.job_async,action_on_failure=action)

Using Magic Functions

from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.settings.description import Platform

define Profile for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=False)

additional files for spark job

profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"--conf": ["spark.executor.cores=1",    
                           "spark.executor.memory=4G"]
               }

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}

set profile and platform

PySparkJobProfile.set('DslTestJobProfile', profile)
platform = Platform.AWS

Open or load task script using magic functions %py_script, %py_script_open or %py_load:

%%py_script -e --name test_word_count.py --path scripts --path_to_book data/data.txt --path_to_save test/ -o output
#!/usr/bin/python
from operator import add
from pyspark import SparkContext
from pyspark.sql import SparkSession
import argparse

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--path_to_book', help='s3 path to book')
    parser.add_argument('--path_to_save', help='s3 path to save')
    parser.add_argument('--output_path', help='s3 path to save')
    args = parser.parse_known_args()
    sc = SparkContext(appName="word_count").getOrCreate()
    lines = sc.textFile(args[0].path_to_book)
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    words_path = '{}{}/'.format(args[0].path_to_save, args[0].output_path)
    counts.saveAsTextFile(words_path)
    print('Word counts to: "{}"'.format(words_path))
    sc.stop()

Start job using magic function %py_data:

%py_data -n test_word_count.py -p DslTestJobProfile -pm $platform
Clone this wiki locally