23
23
# unique to your accounts. These are used when constructing connection strings
24
24
# for the Batch and Storage client objects.
25
25
26
+
26
27
def query_yes_no (question , default = "yes" ):
27
28
"""
28
29
Prompts the user for yes/no input, displaying the specified question text.
@@ -92,11 +93,10 @@ def upload_file_to_container(block_blob_client, container_name, file_path):
92
93
block_blob_client .create_blob_from_path (container_name ,
93
94
blob_name ,
94
95
file_path )
95
-
96
+
96
97
# Obtain the SAS token for the container.
97
98
sas_token = get_container_sas_token (block_blob_client ,
98
- container_name , azureblob .BlobPermissions .READ )
99
-
99
+ container_name , azureblob .BlobPermissions .READ )
100
100
101
101
sas_url = block_blob_client .make_blob_url (container_name ,
102
102
blob_name ,
@@ -105,6 +105,7 @@ def upload_file_to_container(block_blob_client, container_name, file_path):
105
105
return batchmodels .ResourceFile (file_path = blob_name ,
106
106
http_url = sas_url )
107
107
108
+
108
109
def get_container_sas_token (block_blob_client ,
109
110
container_name , blob_permissions ):
110
111
"""
@@ -130,9 +131,8 @@ def get_container_sas_token(block_blob_client,
130
131
return container_sas_token
131
132
132
133
133
-
134
134
def get_container_sas_url (block_blob_client ,
135
- container_name , blob_permissions ):
135
+ container_name , blob_permissions ):
136
136
"""
137
137
Obtains a shared access signature URL that provides write access to the
138
138
ouput container to which the tasks will upload their output.
@@ -146,10 +146,11 @@ def get_container_sas_url(block_blob_client,
146
146
"""
147
147
# Obtain the SAS token for the container.
148
148
sas_token = get_container_sas_token (block_blob_client ,
149
- container_name , azureblob .BlobPermissions .WRITE )
149
+ container_name , azureblob .BlobPermissions .WRITE )
150
150
151
151
# Construct SAS URL for the container
152
- container_sas_url = "https://{}.blob.core.windows.net/{}?{}" .format (config ._STORAGE_ACCOUNT_NAME , container_name , sas_token )
152
+ container_sas_url = "https://{}.blob.core.windows.net/{}?{}" .format (
153
+ config ._STORAGE_ACCOUNT_NAME , container_name , sas_token )
153
154
154
155
return container_sas_url
155
156
@@ -174,16 +175,16 @@ def create_pool(batch_service_client, pool_id):
174
175
175
176
# The start task installs ffmpeg on each node from an available repository, using
176
177
# an administrator user identity.
177
-
178
+
178
179
new_pool = batch .models .PoolAddParameter (
179
180
id = pool_id ,
180
181
virtual_machine_configuration = batchmodels .VirtualMachineConfiguration (
181
182
image_reference = batchmodels .ImageReference (
182
- publisher = "Canonical" ,
183
- offer = "UbuntuServer" ,
184
- sku = "18.04-LTS" ,
185
- version = "latest"
186
- ),
183
+ publisher = "Canonical" ,
184
+ offer = "UbuntuServer" ,
185
+ sku = "18.04-LTS" ,
186
+ version = "latest"
187
+ ),
187
188
node_agent_sku_id = "batch.node.ubuntu 18.04" ),
188
189
vm_size = config ._POOL_VM_SIZE ,
189
190
target_dedicated_nodes = config ._DEDICATED_POOL_NODE_COUNT ,
@@ -193,13 +194,14 @@ def create_pool(batch_service_client, pool_id):
193
194
wait_for_success = True ,
194
195
user_identity = batchmodels .UserIdentity (
195
196
auto_user = batchmodels .AutoUserSpecification (
196
- scope = batchmodels .AutoUserScope .pool ,
197
- elevation_level = batchmodels .ElevationLevel .admin )),
198
- )
197
+ scope = batchmodels .AutoUserScope .pool ,
198
+ elevation_level = batchmodels .ElevationLevel .admin )),
199
+ )
199
200
)
200
201
201
202
batch_service_client .pool .add (new_pool )
202
203
204
+
203
205
def create_job (batch_service_client , job_id , pool_id ):
204
206
"""
205
207
Creates a job with the specified ID, associated with the specified pool.
@@ -216,7 +218,8 @@ def create_job(batch_service_client, job_id, pool_id):
216
218
pool_info = batch .models .PoolInformation (pool_id = pool_id ))
217
219
218
220
batch_service_client .job .add (job )
219
-
221
+
222
+
220
223
def add_tasks (batch_service_client , job_id , input_files , output_container_sas_url ):
221
224
"""
222
225
Adds a task for each input file in the collection to the specified job.
@@ -234,26 +237,26 @@ def add_tasks(batch_service_client, job_id, input_files, output_container_sas_ur
234
237
235
238
tasks = list ()
236
239
237
- for idx , input_file in enumerate (input_files ):
238
- input_file_path = input_file .file_path
239
- output_file_path = "" .join ((input_file_path ).split ('.' )[:- 1 ]) + '.mp3'
240
- command = "/bin/bash -c \" ffmpeg -i {} {} \" " .format (input_file_path , output_file_path )
240
+ for idx , input_file in enumerate (input_files ):
241
+ input_file_path = input_file .file_path
242
+ output_file_path = "" .join ((input_file_path ).split ('.' )[:- 1 ]) + '.mp3'
243
+ command = "/bin/bash -c \" ffmpeg -i {} {} \" " .format (
244
+ input_file_path , output_file_path )
241
245
tasks .append (batch .models .TaskAddParameter (
242
246
id = 'Task{}' .format (idx ),
243
247
command_line = command ,
244
248
resource_files = [input_file ],
245
249
output_files = [batchmodels .OutputFile (
246
- file_pattern = output_file_path ,
247
- destination = batchmodels .OutputFileDestination (
248
- container = batchmodels .OutputFileBlobContainerDestination (
249
- container_url = output_container_sas_url )),
250
- upload_options = batchmodels .OutputFileUploadOptions (
251
- upload_condition = batchmodels .OutputFileUploadCondition .task_success ))]
252
- )
253
- )
250
+ file_pattern = output_file_path ,
251
+ destination = batchmodels .OutputFileDestination (
252
+ container = batchmodels .OutputFileBlobContainerDestination (
253
+ container_url = output_container_sas_url )),
254
+ upload_options = batchmodels .OutputFileUploadOptions (
255
+ upload_condition = batchmodels .OutputFileUploadCondition .task_success ))]
256
+ )
257
+ )
254
258
batch_service_client .task .add_collection (job_id , tasks )
255
259
256
-
257
260
258
261
def wait_for_tasks_to_complete (batch_service_client , job_id , timeout ):
259
262
"""
@@ -289,7 +292,6 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
289
292
"timeout period of " + str (timeout ))
290
293
291
294
292
-
293
295
if __name__ == '__main__' :
294
296
295
297
start_time = datetime .datetime .now ().replace (microsecond = 0 )
@@ -299,37 +301,37 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
299
301
# Create the blob client, for use in obtaining references to
300
302
# blob storage containers and uploading files to containers.
301
303
302
-
303
304
blob_client = azureblob .BlockBlobService (
304
305
account_name = config ._STORAGE_ACCOUNT_NAME ,
305
306
account_key = config ._STORAGE_ACCOUNT_KEY )
306
307
307
308
# Use the blob client to create the containers in Azure Storage if they
308
309
# don't yet exist.
309
-
310
+
310
311
input_container_name = 'input'
311
312
output_container_name = 'output'
312
313
blob_client .create_container (input_container_name , fail_on_exist = False )
313
314
blob_client .create_container (output_container_name , fail_on_exist = False )
314
315
print ('Container [{}] created.' .format (input_container_name ))
315
316
print ('Container [{}] created.' .format (output_container_name ))
316
317
317
- # Create a list of all MP4 files in the InputFiles directory.
318
+ # Create a list of all MP4 files in the InputFiles directory.
318
319
input_file_paths = []
319
-
320
- for folder , subs , files in os .walk (os .path .join (sys .path [0 ],'InputFiles' )):
320
+
321
+ for folder , subs , files in os .walk (os .path .join (sys .path [0 ], 'InputFiles' )):
321
322
for filename in files :
322
323
if filename .endswith (".mp4" ):
323
- input_file_paths .append (os .path .abspath (os .path .join (folder , filename )))
324
+ input_file_paths .append (os .path .abspath (
325
+ os .path .join (folder , filename )))
324
326
325
- # Upload the input files. This is the collection of files that are to be processed by the tasks.
327
+ # Upload the input files. This is the collection of files that are to be processed by the tasks.
326
328
input_files = [
327
329
upload_file_to_container (blob_client , input_container_name , file_path )
328
330
for file_path in input_file_paths ]
329
331
330
332
# Obtain a shared access signature URL that provides write access to the output
331
333
# container to which the tasks will upload their output.
332
-
334
+
333
335
output_container_sas_url = get_container_sas_url (
334
336
blob_client ,
335
337
output_container_name ,
@@ -348,30 +350,30 @@ def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
348
350
# Create the pool that will contain the compute nodes that will execute the
349
351
# tasks.
350
352
create_pool (batch_client , config ._POOL_ID )
351
-
353
+
352
354
# Create the job that will run the tasks.
353
355
create_job (batch_client , config ._JOB_ID , config ._POOL_ID )
354
356
355
- # Add the tasks to the job. Pass the input files and a SAS URL
357
+ # Add the tasks to the job. Pass the input files and a SAS URL
356
358
# to the storage container for output files.
357
- add_tasks (batch_client , config ._JOB_ID , input_files , output_container_sas_url )
359
+ add_tasks (batch_client , config ._JOB_ID ,
360
+ input_files , output_container_sas_url )
358
361
359
362
# Pause execution until tasks reach Completed state.
360
363
wait_for_tasks_to_complete (batch_client ,
361
364
config ._JOB_ID ,
362
365
datetime .timedelta (minutes = 30 ))
363
366
364
367
print (" Success! All tasks reached the 'Completed' state within the "
365
- "specified timeout period." )
368
+ "specified timeout period." )
366
369
367
370
except batchmodels .BatchErrorException as err :
368
- print_batch_exception (err )
369
- raise
371
+ print_batch_exception (err )
372
+ raise
370
373
371
374
# Delete input container in storage
372
375
print ('Deleting container [{}]...' .format (input_container_name ))
373
376
blob_client .delete_container (input_container_name )
374
-
375
377
376
378
# Print out some timing info
377
379
end_time = datetime .datetime .now ().replace (microsecond = 0 )
0 commit comments