-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelonis_integration.py
More file actions
129 lines (100 loc) · 4.42 KB
/
celonis_integration.py
File metadata and controls
129 lines (100 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""VynFi + Celonis — push synthetic event logs into Celonis IBC.
Backs the blog post "Celonis + VynFi: Load Synthetic Process Mining Data via IBC".
Exports VynFi OCEL events into the Celonis IBC event log CSV format with the
required columns: Case_ID, Activity, Event_Timestamp, Event_ID, User, plus a
case table with attributes.
Requires: pandas. Optional: pycelonis (if pushing to a real Celonis IBC instance).
"""
import os
from pathlib import Path
import pandas as pd
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"], timeout=120.0)
# Find a recent completed job with process_mining data
jobs = client.jobs.list(status="completed", limit=10)
archive = None
for job in jobs.data:
a = client.jobs.download_archive(job.id)
if a.find("process_mining/events.json"):
archive = a
break
a.close()
if archive is None:
print("No job has process_mining/ data. Generate one first.")
raise SystemExit(1)
print(f"Exporting events from archive: {archive}")
# ── Load events ──────────────────────────────────────────────────────────────
events = archive.json("process_mining/events.json")
objects = archive.json("process_mining/objects.json")
print(f" {len(events):,} events, {len(objects):,} objects")
# ── Build the Celonis IBC event log ──────────────────────────────────────────
#
# Celonis IBC expects columns:
# Case_ID, Activity, Event_Timestamp, Event_ID, User, Sort_Key (optional)
# Plus case-level attributes in a separate case table.
event_rows = []
for ev in events:
refs = ev.get("object_refs") or []
case_id = refs[0]["external_id"] if refs else ev.get("case_id") or ev["event_id"]
event_rows.append(
{
"Case_ID": case_id,
"Activity": ev["activity_name"],
"Event_Timestamp": ev["timestamp"],
"Event_ID": ev["event_id"],
"User": ev.get("resource_id", "system"),
"Company_Code": ev.get("company_code", ""),
"Is_Anomaly": ev.get("is_anomaly", False),
"Lifecycle": ev.get("lifecycle", "complete"),
}
)
event_df = pd.DataFrame(event_rows)
event_df["Event_Timestamp"] = pd.to_datetime(event_df["Event_Timestamp"], utc=True, errors="coerce")
event_df = event_df.dropna(subset=["Event_Timestamp"])
event_df = event_df.sort_values(["Case_ID", "Event_Timestamp"])
print(f" {len(event_df):,} event rows, {event_df['Case_ID'].nunique():,} cases")
# ── Case table with per-case attributes ──────────────────────────────────────
case_df = (
event_df.groupby("Case_ID")
.agg(
Start_Time=("Event_Timestamp", "min"),
End_Time=("Event_Timestamp", "max"),
Event_Count=("Activity", "size"),
Has_Anomaly=("Is_Anomaly", "any"),
Company_Code=("Company_Code", "first"),
)
.reset_index()
)
case_df["Cycle_Time_Hours"] = (
case_df["End_Time"] - case_df["Start_Time"]
).dt.total_seconds() / 3600
# ── Export to CSVs ───────────────────────────────────────────────────────────
out_dir = Path("celonis_export")
out_dir.mkdir(exist_ok=True)
events_csv = out_dir / "events.csv"
cases_csv = out_dir / "cases.csv"
event_df.to_csv(events_csv, index=False)
case_df.to_csv(cases_csv, index=False)
print(f"\nExported:")
print(f" {events_csv} ({events_csv.stat().st_size:,} bytes)")
print(f" {cases_csv} ({cases_csv.stat().st_size:,} bytes)")
# ── How to push to Celonis IBC ───────────────────────────────────────────────
print("""
To push into Celonis IBC:
pip install pycelonis
from pycelonis import get_celonis
celonis = get_celonis(api_token=os.environ["CELONIS_API_TOKEN"])
pool = celonis.data_integration.get_data_pool(id="your-pool-id")
pool.create_table(
df_or_path=events_csv,
table_name="VynFi_Events",
primary_keys=["Event_ID"],
)
pool.create_table(
df_or_path=cases_csv,
table_name="VynFi_Cases",
primary_keys=["Case_ID"],
)
Or via the IBC UI: Data Pool > Add Table > Upload CSV.
""")
archive.close()