Skip to content

Commit b4ee9c7

Browse files
authored
Merge pull request #90 from lsst/tickets/DM-50973
Use job name to map the quantum nodes
2 parents 3bba544 + e52e375 commit b4ee9c7

File tree

5 files changed

+166
-54
lines changed

5 files changed

+166
-54
lines changed

config/bps_panda_qnode_map.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# enable qnode map to use job name to map the long pseudo file name
2+
3+
enableQnodeMap: true

doc/changes/DM-50973.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Use job name to map the quantum nodes

python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py

Lines changed: 80 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -304,57 +304,87 @@ def replace_event_file(params, files):
304304
return ret_status, with_events, with_order_id_map, params_map
305305

306306

307-
deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5])
308-
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
309-
data_params = sys.argv[2]
310-
cmd_line = replace_environment_vars(cmd_line)
311-
312-
print(f"cmd_line: {cmd_line}")
313-
print(f"data_params: {data_params}")
314-
315-
# If EventService is enabled, data_params will only contain event information.
316-
# So we need to convert the event information to LSST pseudo file names.
317-
# If EventService is not enabled, this part will not change data_params.
318-
ret_rep = replace_event_file(data_params, sys.argv[4])
319-
ret_event_status, with_events, with_order_id_map, event_params_map = ret_rep
320-
print(
321-
f"ret_event_status: {ret_event_status}, with_events: {with_events} with_order_id_map: {with_order_id_map}"
322-
)
323-
if not ret_event_status:
324-
print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names")
325-
exit_code = 1
326-
sys.exit(exit_code)
307+
def use_map_file(input_file):
308+
"""Check whether the input file needs to be replaced
309+
because enableQnodeMap is enabled.
327310
328-
for event_param in event_params_map:
329-
order_id = event_params_map[event_param]["order_id"]
330-
pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id]
331-
print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}")
332-
cmd_line = cmd_line.replace(event_param, pseudo_file_name)
333-
data_params = data_params.replace(event_param, pseudo_file_name)
311+
Parameters
312+
----------
313+
input_file : `str`
314+
Input file either a pseudo file or job name.
334315
335-
# If job name map is enabled, data_params will only contain order_id
336-
# information. Here we will convert order_id information to LSST pseudo
337-
# file names.
316+
Returns
317+
-------
318+
use_qnode_map: `bool`
319+
Whether qnode_map is used. There is a placeholder 'PH'
320+
when enableQnodeMap is true.
321+
"""
322+
parts = input_file.split(":")
323+
use_qnode_map = len(parts) == 2 and parts[0] == "PH"
324+
return use_qnode_map
325+
326+
327+
if __name__ == "__main__":
328+
deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5])
329+
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
330+
data_params = sys.argv[2]
331+
cmd_line = replace_environment_vars(cmd_line)
332+
333+
print(f"cmd_line: {cmd_line}")
334+
print(f"data_params: {data_params}")
335+
336+
# If EventService is enabled, data_params will only contain
337+
# event information. So we need to convert the event information
338+
# to LSST pseudo file names. If EventService is not enabled,
339+
# this part will not change data_params.
340+
ret_rep = replace_event_file(data_params, sys.argv[4])
341+
ret_event_status, with_events, with_order_id_map, event_params_map = ret_rep
342+
print(
343+
f"ret_event_status: {ret_event_status}, "
344+
f"with_events: {with_events} "
345+
f"with_order_id_map: {with_order_id_map}"
346+
)
347+
if not ret_event_status:
348+
print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names")
349+
exit_code = 1
350+
sys.exit(exit_code)
351+
352+
for event_param in event_params_map:
353+
order_id = event_params_map[event_param]["order_id"]
354+
pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id]
355+
print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}")
356+
cmd_line = cmd_line.replace(event_param, pseudo_file_name)
357+
data_params = data_params.replace(event_param, pseudo_file_name)
358+
359+
# If job name map is enabled, data_params will only contain order_id
360+
# information. Here we will convert order_id information to LSST pseudo
361+
# file names.
362+
363+
data_params = data_params.split("+")
364+
365+
"""Replace the pipetask command line placeholders
366+
with actual data provided in the script call
367+
in form placeholder1:file1+placeholder2:file2:...
368+
"""
369+
cmd_line = replace_files_placeholders(cmd_line, sys.argv[4])
338370

339-
data_params = data_params.split("+")
371+
jobname = data_params[0]
372+
if use_map_file(jobname):
373+
with open("qnode_map.json", encoding="utf-8") as f:
374+
qnode_map = json.load(f)
375+
data_params = qnode_map[jobname].split("+")
340376

341-
"""Replace the pipetask command line placeholders
342-
with actual data provided in the script call
343-
in form placeholder1:file1+placeholder2:file2:...
344-
"""
345-
cmd_line = replace_files_placeholders(cmd_line, sys.argv[4])
346-
347-
for key_value_pair in data_params[1:]:
348-
(key, value) = key_value_pair.split(":")
349-
cmd_line = cmd_line.replace("{" + key + "}", value)
350-
351-
print("executable command line:")
352-
print(cmd_line)
353-
354-
exit_status = os.system(cmd_line)
355-
exit_code = 1
356-
if os.WIFSIGNALED(exit_status):
357-
exit_code = os.WTERMSIG(exit_status) + 128
358-
elif os.WIFEXITED(exit_status):
359-
exit_code = os.WEXITSTATUS(exit_status)
360-
sys.exit(exit_code)
377+
for key_value_pair in data_params[1:]:
378+
(key, value) = key_value_pair.split(":")
379+
cmd_line = cmd_line.replace("{" + key + "}", value)
380+
381+
print("executable command line:")
382+
print(cmd_line)
383+
384+
exit_status = os.system(cmd_line)
385+
exit_code = 1
386+
if os.WIFSIGNALED(exit_status):
387+
exit_code = os.WTERMSIG(exit_status) + 128
388+
elif os.WIFEXITED(exit_status):
389+
exit_code = os.WEXITSTATUS(exit_status)
390+
sys.exit(exit_code)

python/lsst/ctrl/bps/panda/utils.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import binascii
3939
import concurrent.futures
40+
import json
4041
import logging
4142
import os
4243
import random
@@ -228,6 +229,7 @@ def _make_doma_work(
228229
max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
229230
max_wms_job_wall_time=None,
230231
remote_filename=None,
232+
qnode_map_filename=None,
231233
):
232234
"""Make the DOMA Work object for a PanDA task.
233235
@@ -358,6 +360,9 @@ def _make_doma_work(
358360
if gwfile.job_access_remote:
359361
direct_io_files.add(gwfile.name)
360362

363+
if qnode_map_filename:
364+
local_pfns.update(qnode_map_filename)
365+
361366
submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
362367

363368
if not direct_io_files:
@@ -590,6 +595,7 @@ def add_idds_work(config, generic_workflow, idds_workflow):
590595
"""
591596
# event service
592597
_, enable_event_service = config.search("enableEventService", opt={"default": None})
598+
_, enable_qnode_map = config.search("enableQnodeMap", opt={"default": None})
593599
_, max_payloads_per_panda_job = config.search(
594600
"maxPayloadsPerPandaJob", opt={"default": PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB}
595601
)
@@ -617,10 +623,11 @@ def add_idds_work(config, generic_workflow, idds_workflow):
617623
task_count = 0 # Task number/ID in idds workflow used for unique name
618624
remote_archive_filename = None
619625

626+
submit_path = config["submitPath"]
627+
620628
submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
621629
if submit_cmd:
622630
files = generic_workflow.get_executables(data=False, transfer_only=True)
623-
submit_path = config["submitPath"]
624631
archive_filename = f"jobO.{uuid.uuid4()}.tar.gz"
625632
archive_filename = create_archive_file(submit_path, archive_filename, files)
626633
remote_archive_filename = copy_files_to_pandacache(archive_filename)
@@ -636,7 +643,6 @@ def add_idds_work(config, generic_workflow, idds_workflow):
636643
enable_event_service = [i.strip() for i in enable_event_service]
637644
if enable_job_name_map:
638645
doma_tree = DomaTree(name=generic_workflow.name)
639-
submit_path = config[".bps_defined.submitPath"]
640646
_, order_id_map_filename = config.search(
641647
"orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
642648
)
@@ -655,6 +661,14 @@ def add_idds_work(config, generic_workflow, idds_workflow):
655661
# catch dependency issues to loop through again later.
656662
jobs_with_dependency_issues = {}
657663

664+
# Initialize quantum node map
665+
qnode_map = {}
666+
qnode_map_filename = None
667+
if enable_qnode_map:
668+
qnode_map_file = os.path.join(submit_path, "qnode_map.json")
669+
qnode_map_filename = {"qnodemap": qnode_map_file}
670+
files_to_pre_stage.update(qnode_map_filename)
671+
658672
# Assume jobs with same label share config values
659673
for job_label in generic_workflow.labels:
660674
_LOG.debug("job_label = %s", job_label)
@@ -709,6 +723,7 @@ def add_idds_work(config, generic_workflow, idds_workflow):
709723
max_payloads_per_panda_job=max_payloads_per_panda_job,
710724
max_wms_job_wall_time=max_wms_job_wall_time,
711725
remote_filename=remote_archive_filename,
726+
qnode_map_filename=qnode_map_filename,
712727
)
713728
name_works[work.task_name] = work
714729
files_to_pre_stage.update(files)
@@ -718,6 +733,12 @@ def add_idds_work(config, generic_workflow, idds_workflow):
718733

719734
pseudo_filename = _make_pseudo_filename(config, gwjob)
720735
job_to_pseudo_filename[gwjob.name] = pseudo_filename
736+
737+
if enable_qnode_map:
738+
job_name_PH = "PH:" + gwjob.name
739+
job_to_pseudo_filename[gwjob.name] = job_name_PH
740+
qnode_map[job_name_PH] = pseudo_filename
741+
721742
job_to_task[gwjob.name] = work.get_work_name()
722743
deps = []
723744
missing_deps = False
@@ -742,7 +763,8 @@ def add_idds_work(config, generic_workflow, idds_workflow):
742763
}
743764
)
744765
if not missing_deps:
745-
f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else pseudo_filename
766+
j_name = job_to_pseudo_filename[gwjob.name]
767+
f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name
746768
work.dependency_map.append(
747769
{
748770
"name": f_name,
@@ -757,6 +779,10 @@ def add_idds_work(config, generic_workflow, idds_workflow):
757779
"label": job_label,
758780
}
759781

782+
if enable_qnode_map:
783+
with open(qnode_map_file, "w", encoding="utf-8") as f:
784+
json.dump(qnode_map, f, indent=2)
785+
760786
# If there were any issues figuring out dependencies through earlier loop
761787
if jobs_with_dependency_issues:
762788
_LOG.warning("Could not prepare workflow in single pass. Please notify developers.")
@@ -790,7 +816,7 @@ def add_idds_work(config, generic_workflow, idds_workflow):
790816

791817
work.dependency_map.append(
792818
{
793-
"name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else pseudo_filename,
819+
"name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else job_name,
794820
"order_id": order_id,
795821
"dependencies": deps,
796822
}

tests/test_cmd_line_decoder.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# This file is part of ctrl_bps_panda.
2+
#
3+
# Developed for the LSST Data Management System.
4+
# This product includes software developed by the LSST Project
5+
# (https://www.lsst.org).
6+
# See the COPYRIGHT file at the top-level directory of this distribution
7+
# for details of code ownership.
8+
#
9+
# This software is dual licensed under the GNU General Public License and also
10+
# under a 3-clause BSD license. Recipients may choose which of these licenses
11+
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12+
# respectively. If you choose the GPL option then the following text applies
13+
# (but note that there is still no warranty even if you opt for BSD instead):
14+
#
15+
# This program is free software: you can redistribute it and/or modify
16+
# it under the terms of the GNU General Public License as published by
17+
# the Free Software Foundation, either version 3 of the License, or
18+
# (at your option) any later version.
19+
#
20+
# This program is distributed in the hope that it will be useful,
21+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
22+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23+
# GNU General Public License for more details.
24+
#
25+
# You should have received a copy of the GNU General Public License
26+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
27+
28+
"""Unit tests for edgenode/cmd_line_decoder.py."""
29+
30+
import unittest
31+
32+
from lsst.ctrl.bps.panda.edgenode.cmd_line_decoder import use_map_file
33+
34+
35+
class TestCmdLineDecoder(unittest.TestCase):
36+
"""Test command line decoder functions."""
37+
38+
def test_valid_input(self):
39+
self.assertTrue(use_map_file("PH:file12345"))
40+
41+
def test_invalid_prefix(self):
42+
self.assertFalse(use_map_file("XX:file12345"))
43+
44+
def test_missing_colon(self):
45+
self.assertFalse(use_map_file("PH12345"))
46+
47+
def test_too_many_parts(self):
48+
self.assertFalse(use_map_file("PH:file123:456"))
49+
50+
51+
if __name__ == "__main__":
52+
unittest.main()

0 commit comments

Comments
 (0)