-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpm4py_integration.py
More file actions
132 lines (103 loc) · 4.97 KB
/
pm4py_integration.py
File metadata and controls
132 lines (103 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""VynFi + pm4py — process mining on synthetic OCEL event logs.
Backs the blog post "pm4py + VynFi: Process Mining on Synthetic OCEL Event Logs".
DataSynth 3.0 writes a full OCEL 2.0 event log to `process_mining/events.json`.
This example loads it into pm4py, discovers a Petri net with the Inductive
Miner, and runs fitness/precision checks against the discovered model.
Requires: `pip install pm4py pandas`
"""
import os
import pandas as pd
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"], timeout=120.0)
# Use the most recent completed job with process_mining/ data
jobs = client.jobs.list(status="completed", limit=10)
if not jobs.data:
print("No completed jobs. Generate some data first.")
raise SystemExit(1)
job_id = None
archive = None
for job in jobs.data:
a = client.jobs.download_archive(job.id)
if a.find("process_mining/events.json"):
job_id = job.id
archive = a
break
a.close()
if archive is None:
print("No job has process_mining/ data. Generate one with:")
print(' config = {"sector": "retail", "rows": 500, "processModels": ["o2c","p2p"], ...}')
raise SystemExit(1)
print(f"Loading events from job {job_id}")
# ── Load events into a pm4py-compatible DataFrame ────────────────────────────
events = archive.json("process_mining/events.json")
print(f" {len(events)} OCEL events")
# Flatten to a pm4py-friendly record per event
rows = []
for ev in events:
# Pick the first object_ref as the case (for classical process mining)
refs = ev.get("object_refs") or []
case_id = refs[0]["external_id"] if refs else ev.get("case_id") or ev["event_id"]
rows.append(
{
"case:concept:name": case_id,
"concept:name": ev["activity_name"],
"time:timestamp": ev["timestamp"],
"org:resource": ev.get("resource_id", "unknown"),
"company_code": ev.get("company_code", ""),
"is_anomaly": ev.get("is_anomaly", False),
}
)
df = pd.DataFrame(rows)
# DS 3.1+: timestamps are microsecond-precision — pd.to_datetime(utc=True) no
# longer drops rows. (Earlier DS versions used nanosecond precision which
# would silently drop 95 % of events.)
df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], utc=True, errors="coerce")
kept = df["time:timestamp"].notna().sum()
print(f" Timestamp parse retention: {kept}/{len(df)} ({100 * kept / max(len(df), 1):.1f}%)")
df = df.dropna(subset=["time:timestamp"]).sort_values(["case:concept:name", "time:timestamp"])
print(f" {df['case:concept:name'].nunique()} cases, {len(df)} events")
print(f" {df['concept:name'].nunique()} distinct activities")
# ── Discover a Petri net with the Inductive Miner ────────────────────────────
try:
import pm4py
except ImportError:
print("\nInstall pm4py to continue: pip install pm4py")
raise SystemExit(0)
event_log = pm4py.convert_to_event_log(df)
print(f"\nDiscovering Petri net (inductive miner)...")
net, im, fm = pm4py.discover_petri_net_inductive(event_log)
print(f" Places: {len(net.places)}, Transitions: {len(net.transitions)}, Arcs: {len(net.arcs)}")
# ── Conformance checking ─────────────────────────────────────────────────────
print(f"\nConformance:")
try:
fitness = pm4py.fitness_token_based_replay(event_log, net, im, fm)
print(f" Token-based fitness: {fitness['average_trace_fitness']:.3f}")
except Exception as e:
print(f" Fitness skipped: {e}")
try:
precision = pm4py.precision_token_based(event_log, net, im, fm)
print(f" Token-based precision: {precision:.3f}")
except Exception as e:
print(f" Precision skipped: {e}")
# ── Variant analysis via pm4py ───────────────────────────────────────────────
variants = pm4py.get_variants(event_log)
print(f"\nVariants: {len(variants)}")
top = sorted(variants.items(), key=lambda x: -len(x[1]))[:5]
for v, cases in top:
print(f" [{len(cases):4d} cases] {v[:80]}")
# ── Export to XES for downstream tools ───────────────────────────────────────
xes_path = "vynfi_event_log.xes"
pm4py.write_xes(event_log, xes_path)
print(f"\nExported to {xes_path}")
# ── Cross-check with VynFi's pre-built variant analytics ─────────────────────
print(f"\nVynFi variant summary (cross-check):")
try:
a = client.jobs.analytics(job_id)
if a.process_variant_summary:
v = a.process_variant_summary
print(f" VynFi variant count: {v.variant_count}")
print(f" Entropy: {v.variant_entropy:.3f}")
print(f" Happy-path concentration: {v.happy_path_concentration:.1%}")
except vynfi.NotFoundError:
pass
archive.close()