1
1
# Copyright (c) 2019 Guilherme Borges <[email protected] >
2
2
# See the COPYRIGHT file for more information
3
3
4
+ from __future__ import annotations
5
+
4
6
import struct
5
7
6
8
from twisted .internet .protocol import Factory , Protocol
12
14
13
15
14
16
class PoolServer (Protocol ):
15
- def __init__ (self , factory ) :
16
- self .factory = factory
17
+ def __init__ (self , factory : PoolServerFactory ) -> None :
18
+ self .factory : PoolServerFactory = factory
17
19
self .local_pool : bool = (
18
20
CowrieConfig .get ("proxy" , "pool" , fallback = "local" ) == "local"
19
21
)
@@ -25,13 +27,13 @@ def __init__(self, factory):
25
27
)
26
28
27
29
if self .use_nat :
28
- self .nat_public_ip = CowrieConfig .get ("backend_pool" , "nat_public_ip" )
30
+ self .nat_public_ip : str = CowrieConfig .get ("backend_pool" , "nat_public_ip" )
29
31
30
- def dataReceived (self , data ) :
31
- res_op = struct .unpack ("!c" , bytes ([data [0 ]]))[
32
+ def dataReceived (self , data : bytes ) -> None :
33
+ res_op : bytes = struct .unpack ("!c" , bytes ([data [0 ]]))[
32
34
0
33
35
] # yes, this needs to be done to extract the op code correctly
34
- response = None
36
+ response : bytes = b""
35
37
36
38
if res_op == b"i" :
37
39
recv = struct .unpack ("!II?" , data [1 :])
@@ -76,10 +78,10 @@ def dataReceived(self, data):
76
78
guest_id = guest_id ,
77
79
)
78
80
79
- ssh_port = CowrieConfig .getint (
81
+ ssh_port : int = CowrieConfig .getint (
80
82
"backend_pool" , "guest_ssh_port" , fallback = 22
81
83
)
82
- telnet_port = CowrieConfig .getint (
84
+ telnet_port : int = CowrieConfig .getint (
83
85
"backend_pool" , "guest_telnet_port" , fallback = 23
84
86
)
85
87
@@ -162,31 +164,32 @@ def dataReceived(self, data):
162
164
# free this connection and allow VM to be re-used
163
165
self .factory .pool_service .reuse_vm (guest_id )
164
166
165
- if response :
167
+ if response and self . transport :
166
168
self .transport .write (response )
167
169
168
170
169
171
class PoolServerFactory (Factory ):
170
- def __init__ (self ):
171
- self .initialised = False
172
+ def __init__ (self ) -> None :
173
+ self .initialised : bool = False
172
174
173
175
# pool handling
174
- self .pool_service = None
176
+ self .pool_service : PoolService
175
177
176
178
self .tac = None
177
179
178
180
# NAT service
179
181
self .nat = NATService ()
180
182
181
- def startFactory (self ):
183
+ def startFactory (self ) -> None :
182
184
# start the pool thread with default configs
183
185
self .pool_service = PoolService (self .nat )
184
- self .pool_service .start_pool ()
186
+ if self .pool_service :
187
+ self .pool_service .start_pool ()
185
188
186
- def stopFactory (self ):
189
+ def stopFactory (self ) -> None :
187
190
log .msg (eventid = "cowrie.backend_pool.server" , format = "Stopping backend pool..." )
188
-
189
- self .pool_service .shutdown_pool ()
191
+ if self . pool_service :
192
+ self .pool_service .shutdown_pool ()
190
193
191
194
def buildProtocol (self , addr ):
192
195
log .msg (
0 commit comments