Skip to content

Commit aeb187c

Browse files
committed
style(fsm): format TheMachine and Discovery
Added type annotations to the fsm.py file and used ruff (vscode) to: - Black-compatible code formatting. - fix all auto-fixable violations, like unused imports. - isort-compatible import sorting. Signed-off-by: Paulo Vital <[email protected]>
1 parent bdc1fd3 commit aeb187c

File tree

1 file changed

+92
-66
lines changed

1 file changed

+92
-66
lines changed

src/instana/fsm.py

Lines changed: 92 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,73 +8,83 @@
88
import subprocess
99
import sys
1010
import threading
11+
from typing import TYPE_CHECKING, Any, Callable, Optional
1112

1213
from fysom import Fysom
1314

14-
from .log import logger
15-
from .util import get_default_gateway
16-
from .version import VERSION
15+
from instana.log import logger
16+
from instana.util import get_default_gateway
17+
from instana.version import VERSION
1718

19+
if TYPE_CHECKING:
20+
from instana.agent.host import HostAgent
1821

19-
class Discovery(object):
20-
pid = 0
21-
name = None
22-
args = None
23-
fd = -1
24-
inode = ""
2522

26-
def __init__(self, **kwds):
23+
class Discovery:
24+
pid: int = 0
25+
name: Optional[str] = None
26+
args: Optional[List[str]] = None
27+
fd: int = -1
28+
inode: str = ""
29+
30+
def __init__(self, **kwds: Any) -> None:
2731
self.__dict__.update(kwds)
2832

29-
def to_dict(self):
30-
kvs = dict()
31-
kvs['pid'] = self.pid
32-
kvs['name'] = self.name
33-
kvs['args'] = self.args
34-
kvs['fd'] = self.fd
35-
kvs['inode'] = self.inode
33+
def to_dict(self) -> Dict[str, Any]:
34+
kvs: Dict[str, Any] = dict()
35+
kvs["pid"] = self.pid
36+
kvs["name"] = self.name
37+
kvs["args"] = self.args
38+
kvs["fd"] = self.fd
39+
kvs["inode"] = self.inode
3640
return kvs
3741

3842

39-
class TheMachine(object):
43+
class TheMachine:
4044
RETRY_PERIOD = 30
4145
THREAD_NAME = "Instana Machine"
4246

43-
agent = None
47+
agent: Optional["HostAgent"] = None
4448
fsm = None
4549
timer = None
4650

4751
warnedPeriodic = False
4852

49-
def __init__(self, agent):
53+
def __init__(self, agent: "HostAgent") -> None:
5054
logger.debug("Initializing host agent state machine")
5155

5256
self.agent = agent
53-
self.fsm = Fysom({
54-
"events": [
55-
("lookup", "*", "found"),
56-
("announce", "found", "announced"),
57-
("pending", "announced", "wait4init"),
58-
("ready", "wait4init", "good2go")],
59-
"callbacks": {
60-
# Can add the following to debug
61-
# "onchangestate": self.print_state_change,
62-
"onlookup": self.lookup_agent_host,
63-
"onannounce": self.announce_sensor,
64-
"onpending": self.on_ready,
65-
"ongood2go": self.on_good2go}})
57+
self.fsm = Fysom(
58+
{
59+
"events": [
60+
("lookup", "*", "found"),
61+
("announce", "found", "announced"),
62+
("pending", "announced", "wait4init"),
63+
("ready", "wait4init", "good2go"),
64+
],
65+
"callbacks": {
66+
# Can add the following to debug
67+
# "onchangestate": self.print_state_change,
68+
"onlookup": self.lookup_agent_host,
69+
"onannounce": self.announce_sensor,
70+
"onpending": self.on_ready,
71+
"ongood2go": self.on_good2go,
72+
},
73+
}
74+
)
6675

6776
self.timer = threading.Timer(1, self.fsm.lookup)
6877
self.timer.daemon = True
6978
self.timer.name = self.THREAD_NAME
7079
self.timer.start()
7180

7281
@staticmethod
73-
def print_state_change(e):
74-
logger.debug('========= (%i#%s) FSM event: %s, src: %s, dst: %s ==========',
75-
os.getpid(), threading.current_thread().name, e.event, e.src, e.dst)
82+
def print_state_change(e: Any) -> None:
83+
logger.debug(
84+
f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} =========="
85+
)
7686

77-
def reset(self):
87+
def reset(self) -> None:
7888
"""
7989
reset is called to start from scratch in a process. It may be called on first boot or
8090
after a detected fork.
@@ -87,7 +97,7 @@ def reset(self):
8797
logger.debug("State machine being reset. Will start a new announce cycle.")
8898
self.fsm.lookup()
8999

90-
def lookup_agent_host(self, e):
100+
def lookup_agent_host(self, e: Any) -> bool:
91101
host = self.agent.options.agent_host
92102
port = self.agent.options.agent_port
93103

@@ -105,39 +115,43 @@ def lookup_agent_host(self, e):
105115
return True
106116

107117
if self.warnedPeriodic is False:
108-
logger.info("Instana Host Agent couldn't be found. Will retry periodically...")
118+
logger.info(
119+
"Instana Host Agent couldn't be found. Will retry periodically..."
120+
)
109121
self.warnedPeriodic = True
110122

111-
self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup")
123+
self.schedule_retry(
124+
self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup"
125+
)
112126
return False
113127

114-
def announce_sensor(self, e):
115-
logger.debug("Attempting to make an announcement to the agent on %s:%d",
116-
self.agent.options.agent_host, self.agent.options.agent_port)
128+
def announce_sensor(self, e: Any) -> bool:
129+
logger.debug(
130+
f"Attempting to make an announcement to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}"
131+
)
117132
pid = os.getpid()
118133

119134
try:
120135
if os.path.isfile("/proc/self/cmdline"):
121136
with open("/proc/self/cmdline") as cmd:
122137
cmdinfo = cmd.read()
123-
cmdline = cmdinfo.split('\x00')
138+
cmdline = cmdinfo.split("\x00")
124139
else:
125140
# Python doesn't provide a reliable method to determine what
126141
# the OS process command line may be. Here we are forced to
127142
# rely on ps rather than adding a dependency on something like
128143
# psutil which requires dev packages, gcc etc...
129-
proc = subprocess.Popen(["ps", "-p", str(pid), "-o", "command"],
130-
stdout=subprocess.PIPE)
144+
proc = subprocess.Popen(
145+
["ps", "-p", str(pid), "-o", "command"], stdout=subprocess.PIPE
146+
)
131147
(out, _) = proc.communicate()
132-
parts = out.split(b'\n')
148+
parts = out.split(b"\n")
133149
cmdline = [parts[1].decode("utf-8")]
134150
except Exception:
135151
cmdline = sys.argv
136152
logger.debug("announce_sensor", exc_info=True)
137153

138-
d = Discovery(pid=self.__get_real_pid(),
139-
name=cmdline[0],
140-
args=cmdline[1:])
154+
d = Discovery(pid=self.__get_real_pid(), name=cmdline[0], args=cmdline[1:])
141155

142156
# If we're on a system with a procfs
143157
if os.path.exists("/proc/"):
@@ -146,47 +160,56 @@ def announce_sensor(self, e):
146160
# PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8'
147161
# Use a try/except as a safety
148162
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149-
sock.connect((self.agent.options.agent_host, self.agent.options.agent_port))
150-
path = "/proc/%d/fd/%d" % (pid, sock.fileno())
163+
sock.connect(
164+
(self.agent.options.agent_host, self.agent.options.agent_port)
165+
)
166+
path = f"/proc/{pid}/fd/{sock.fileno()}"
151167
d.fd = sock.fileno()
152168
d.inode = os.readlink(path)
153-
except:
169+
except: # noqa: E722
154170
logger.debug("Error generating file descriptor: ", exc_info=True)
155171

156172
payload = self.agent.announce(d)
157173

158174
if not payload:
159175
logger.debug("Cannot announce sensor. Scheduling retry.")
160-
self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce")
176+
self.schedule_retry(
177+
self.announce_sensor, e, f"{self.THREAD_NAME}: announce"
178+
)
161179
return False
162-
180+
163181
self.agent.set_from(payload)
164182
self.fsm.pending()
165-
logger.debug("Announced pid: %s (true pid: %s). Waiting for Agent Ready...",
166-
str(pid), str(self.agent.announce_data.pid))
183+
logger.debug(
184+
f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..."
185+
)
167186
return True
168187

169-
def schedule_retry(self, fun, e, name):
188+
def schedule_retry(self, fun: Callable, e: Any, name: str) -> None:
170189
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
171190
self.timer.daemon = True
172191
self.timer.name = name
173192
self.timer.start()
174193

175-
def on_ready(self, _):
194+
def on_ready(self, _: Any) -> None:
176195
self.agent.start()
177196

178197
ns_pid = str(os.getpid())
179198
true_pid = str(self.agent.announce_data.pid)
180199

181-
logger.info("Instana host agent available. We're in business. Announced PID: %s (true pid: %s)", ns_pid, true_pid)
200+
logger.info(
201+
f"Instana host agent available. We're in business. Announced PID: {ns_pid} (true PID: {true_pid})"
202+
)
182203

183-
def on_good2go(self, _):
204+
def on_good2go(self, _: Any) -> None:
184205
ns_pid = str(os.getpid())
185206
true_pid = str(self.agent.announce_data.pid)
186207

187-
self.agent.log_message_to_host_agent("Instana Python Package %s: PID %s (true pid: %s) is now online and reporting" % (VERSION, ns_pid, true_pid))
208+
self.agent.log_message_to_host_agent(
209+
f"Instana Python Package {VERSION}: PID {ns_pid} (true PID: {true_pid}) is now online and reporting"
210+
)
188211

189-
def __get_real_pid(self):
212+
def __get_real_pid(self) -> int:
190213
"""
191214
Attempts to determine the true process ID by querying the
192215
/proc/<pid>/sched file. This works on systems with a proc filesystem.
@@ -195,14 +218,14 @@ def __get_real_pid(self):
195218
pid = None
196219

197220
if os.path.exists("/proc/"):
198-
sched_file = "/proc/%d/sched" % os.getpid()
221+
sched_file = f"/proc/{os.getpid()}/sched"
199222

200223
if os.path.isfile(sched_file):
201224
try:
202225
file = open(sched_file)
203226
line = file.readline()
204-
g = re.search(r'\((\d+),', line)
205-
if len(g.groups()) == 1:
227+
g = re.search(r"\((\d+),", line)
228+
if g and len(g.groups()) == 1:
206229
pid = int(g.groups()[0])
207230
except Exception:
208231
logger.debug("parsing sched file failed", exc_info=True)
@@ -211,3 +234,6 @@ def __get_real_pid(self):
211234
pid = os.getpid()
212235

213236
return pid
237+
238+
239+
# Made with Bob

0 commit comments

Comments
 (0)