|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import sys |
| 4 | +import urllib.request as request |
| 5 | +import json |
| 6 | + |
| 7 | + |
| 8 | +def print_usage(): |
| 9 | + print("Usage:") |
| 10 | + print(" e.g. dtle-7to11.py 'http://127.0.0.1:4646'") |
| 11 | + |
| 12 | + |
| 13 | +if len(sys.argv) != 2: |
| 14 | + print_usage() |
| 15 | + exit(0) |
| 16 | + |
| 17 | +if not sys.argv[1].startswith("http"): |
| 18 | + print_usage() |
| 19 | + exit(0) |
| 20 | + |
| 21 | +nomad_url = sys.argv[1] |
| 22 | + |
| 23 | +resp = request.urlopen(nomad_url + "/v1/jobs") |
| 24 | +j_jobs = json.load(resp) |
| 25 | +resp.close() |
| 26 | + |
| 27 | +for job in j_jobs: |
| 28 | + job_id = job['ID'] |
| 29 | + resp = request.urlopen(nomad_url + "/v1/job/" + job_id) |
| 30 | + j_job = json.load(resp) |
| 31 | + resp.close() |
| 32 | + |
| 33 | + src_task = None |
| 34 | + dest_task = None |
| 35 | + |
| 36 | + # get src/dest tasks |
| 37 | + for task_group in j_job["TaskGroups"]: |
| 38 | + for task in task_group["Tasks"]: |
| 39 | + if task["Name"] == "src": |
| 40 | + src_task = task |
| 41 | + elif task["Name"] == "dest": |
| 42 | + dest_task = task |
| 43 | + |
| 44 | + if src_task is None or dest_task is None: |
| 45 | + print("failed to convert job: %s" % job_id) |
| 46 | + else: |
| 47 | + src_config = src_task["Config"] |
| 48 | + dest_config = dest_task["Config"] |
| 49 | + if "DestType" in dest_config: |
| 50 | + print("job already converted: %s" % job_id) |
| 51 | + else: |
| 52 | + # rename src ConnectionConfig |
| 53 | + if "ConnectionConfig" in src_config: |
| 54 | + src_config["SrcConnectionConfig"] = src_config["ConnectionConfig"] |
| 55 | + del src_config["ConnectionConfig"] |
| 56 | + # move dest config to src |
| 57 | + for k in dest_config: |
| 58 | + new_key = "DestConnectionConfig" if k == "ConnectionConfig" else k |
| 59 | + src_config[new_key] = dest_config[k] |
| 60 | + # dest special config |
| 61 | + is_kafka = "KafkaConfig" in src_config |
| 62 | + dest_task["Config"] = {"DestType": "kafka" if is_kafka else "mysql"} |
| 63 | + |
| 64 | + # POST job |
| 65 | + payload = json.dumps({"Job": j_job}).encode("utf-8") |
| 66 | + req = request.Request(nomad_url + "/v1/jobs", |
| 67 | + data=payload, method='POST') |
| 68 | + with request.urlopen(req) as resp: |
| 69 | + print("job converted: %s %s" % (job_id, resp.msg)) |
0 commit comments