diff --git a/quests/dataflow_python/1_Basic_ETL/lab/my_pipeline.py b/quests/dataflow_python/1_Basic_ETL/lab/my_pipeline.py index 586ab94935..aea333a1dc 100644 --- a/quests/dataflow_python/1_Basic_ETL/lab/my_pipeline.py +++ b/quests/dataflow_python/1_Basic_ETL/lab/my_pipeline.py @@ -15,6 +15,7 @@ def run(): parser.add_argument('--stagingLocation', required=True, help='Specify Cloud Storage bucket for staging') parser.add_argument('--tempLocation', required=True, help='Specify Cloud Storage bucket for temp') parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner') + parser.add_argument('--machine_type', required=False, help='Specify machine type') opts = parser.parse_args() @@ -26,6 +27,7 @@ def run(): options.view_as(GoogleCloudOptions).temp_location = opts.tempLocation options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-',time.time_ns()) options.view_as(StandardOptions).runner = opts.runner + options.view_as(WorkerOptions).machine_type = opts.machine_type # TODO: Add static input and output strings diff --git a/quests/dataflow_python/1_Basic_ETL/solution/my_pipeline.py b/quests/dataflow_python/1_Basic_ETL/solution/my_pipeline.py index b810f89c4b..ca5f7282eb 100644 --- a/quests/dataflow_python/1_Basic_ETL/solution/my_pipeline.py +++ b/quests/dataflow_python/1_Basic_ETL/solution/my_pipeline.py @@ -7,6 +7,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners import DataflowRunner, DirectRunner +from apache_beam.options.pipeline_options import WorkerOptions # ### main @@ -18,6 +19,7 @@ def run(): parser.add_argument('--stagingLocation', required=True, help='Specify Cloud Storage bucket for staging') parser.add_argument('--tempLocation', required=True, help='Specify Cloud Storage bucket for temp') parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner') + parser.add_argument('--machine_type', required=False, help='Specify machine type') opts = parser.parse_args() @@ -29,6 +31,7 @@ def run(): options.view_as(GoogleCloudOptions).temp_location = opts.tempLocation options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-',time.time_ns()) options.view_as(StandardOptions).runner = opts.runner + options.view_as(WorkerOptions).machine_type = opts.machine_type # Static input and output input = 'gs://{0}/events.json'.format(opts.project)