-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrouting.py
More file actions
115 lines (91 loc) · 4.75 KB
/
routing.py
File metadata and controls
115 lines (91 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import pulsectl
import json
from datetime import datetime
import os
import re
def fetch_node_state(string):
return str(string).split( '=')[-1].strip('>')
def get_node_data(node, node_type):
# Extract properties from node
active = node.state == pulsectl.PulseStateEnum.running if hasattr(node, 'state') else False
# Prioritize application name if available
app_name = node.proplist.get('application.name', None) if hasattr(node, 'proplist') else None
label = app_name if app_name else (getattr(node, 'description', '') or getattr(node, 'name', 'Unnamed Node'))
# Extract the actual state without the EnumValue representation
state = "unknown"
if hasattr(node, 'state'):
if hasattr(node.state, 'name'):
state = fetch_node_state(node.state.name)
else:
state = str(node.state)
else:
state = "unknown"
active = True
# Include more detailed information if available
additional_info = {}
if hasattr(node, 'proplist'):
for key, value in node.proplist.items():
additional_info[key] = value
return {
"active": active,
"type": node_type,
"label": label,
"state": state,
"additional_info": additional_info # Store additional info if available
}
def get_audio_routing(pulse):
sinks = pulse.sink_list()
sources = pulse.source_list()
sink_inputs = pulse.sink_input_list()
source_outputs = pulse.source_output_list()
return sinks, sources, sink_inputs, source_outputs
def normalize_state(state):
normalized = {
"sinks": {idx: get_node_data(sink, "sink") for idx, sink in state["sinks"].items()},
"sources": {idx: get_node_data(source, "source") for idx, source in state["sources"].items()},
"sink_inputs": {idx: get_node_data(sink_input, "sink_input") for idx, sink_input in state["sink_inputs"].items()},
"source_outputs": {idx: get_node_data(source_output, "source_output") for idx, source_output in state["source_outputs"].items()},
"connections": {
"sink_inputs": [{"input": conn["input"], "sink": conn["sink"]} for conn in state["connections"]["sink_inputs"]],
"source_outputs": [{"output": conn["output"], "source": conn["source"]} for conn in state["connections"]["source_outputs"]]
}
}
return normalized
def generate_audio_state_json(pulse, previous_state=None, directory="./graphs"):
sinks, sources, sink_inputs, source_outputs = get_audio_routing(pulse)
state = {
"sinks": {sink.index: get_node_data(sink, "sink") for sink in sinks},
"sources": {source.index: get_node_data(source, "source") for source in sources},
"sink_inputs": {sink_input.index: get_node_data(sink_input, "sink_input") for sink_input in sink_inputs},
"source_outputs": {source_output.index: get_node_data(source_output, "source_output") for source_output in source_outputs},
"connections": {
"sink_inputs": [{"input": sink_input.index, "sink": sink_input.sink} for sink_input in sink_inputs],
"source_outputs": [{"output": source_output.index, "source": source_output.source} for source_output in source_outputs]
}
}
normalized_state = normalize_state(state)
has_changed = previous_state is None or normalized_state != normalize_state(previous_state)
state["has_changed"] = has_changed
state["changed_items"] = {
"sinks": [idx for idx in state["sinks"] if previous_state is None or previous_state["sinks"].get(idx) != state["sinks"][idx]],
"sources": [idx for idx in state["sources"] if previous_state is None or previous_state["sources"].get(idx) != state["sources"][idx]],
"sink_inputs": [idx for idx in state["sink_inputs"] if previous_state is None or previous_state["sink_inputs"].get(idx) != state["sink_inputs"][idx]],
"source_outputs": [idx for idx in state["source_outputs"] if previous_state is None or previous_state["source_outputs"].get(idx) != state["source_outputs"][idx]]
}
loopbacks = ['sink_inputs', 'source_outputs']
for key in loopbacks:
i=0
for idx in state[key]:
if re.match("^Loopback", state[key][idx]['label']):
i += 1
#print(f"Loopback detected: {state[key][idx]['label']}")
state[key][idx]['label'] = "L" + str(i) + " " + state[key][idx]['label']
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
json_path = os.path.join(directory, f'state_{timestamp}.json')
# with open(json_path, 'w') as f:
# json.dump(state, f, indent=4)
# print(f"Saved state to {json_path}")
return state
if __name__ == "__main__":
pulse = pulsectl.Pulse('audio-routing-visualizer')
generate_audio_state_json(pulse)