Skip to content

Commit

Permalink
optimize network discovery and dynamic interface handling
Browse files Browse the repository at this point in the history
  • Loading branch information
itsknk committed Jul 20, 2024
1 parent 0222b2e commit d23ab79
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions exo/networking/grpc/grpc_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(self, node_id: str, node_port: int, listen_port: int, broadcast_por
self.broadcast_task = None
self.listen_task = None
self.cleanup_task = None
self.interfaces = self.get_network_interfaces()

def get_network_interfaces(self):
interfaces = netifaces.interfaces()
Expand Down Expand Up @@ -95,19 +94,26 @@ async def task_broadcast_presence(self):
}).encode('utf-8')

while True:
for interface in self.interfaces:
try:
if DEBUG_DISCOVERY >= 3: print(f"Broadcasting on interface: {interface}")
transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
local_addr=(netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr'], 0),
family=socket.AF_INET)
sock = transport.get_extra_info('socket')
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
transport.sendto(message, ('<broadcast>', self.broadcast_port))
transport.close()
except Exception as e:
if DEBUG_DISCOVERY >= 2: print(f"Error broadcasting on interface {interface}: {e}")
try:
# Update interfaces periodically
interfaces = self.get_network_interfaces()

for interface in interfaces:
try:
if DEBUG_DISCOVERY >= 3: print(f"Broadcasting on interface: {interface}")
transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
local_addr=(netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr'], 0),
family=socket.AF_INET)
sock = transport.get_extra_info('socket')
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
transport.sendto(message, ('<broadcast>', self.broadcast_port))
transport.close()
except Exception as e:
if DEBUG_DISCOVERY >= 2: print(f"Error broadcasting on interface {interface}: {e}")
except Exception as e:
if DEBUG_DISCOVERY >= 2: print(f"Error updating or accessing interfaces: {e}")

await asyncio.sleep(self.broadcast_interval)

async def on_listen_message(self, data, addr):
Expand All @@ -124,16 +130,15 @@ async def on_listen_message(self, data, addr):
self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time())

async def task_listen_for_peers(self):
for interface in self.interfaces:
try:
if DEBUG_DISCOVERY >= 2: print(f"Listening on interface: {interface}")
await asyncio.get_event_loop().create_datagram_endpoint(
lambda: ListenProtocol(self.on_listen_message),
local_addr=(netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr'], self.listen_port)
)
except Exception as e:
if DEBUG_DISCOVERY >= 2: print(f"Error listening on interface {interface}: {e}")
if DEBUG_DISCOVERY >= 2: print("Started listen task")
try:
if DEBUG_DISCOVERY >= 2: print("Starting to listen on all interfaces")
await asyncio.get_event_loop().create_datagram_endpoint(
lambda: ListenProtocol(self.on_listen_message),
local_addr=('0.0.0.0', self.listen_port)
)
if DEBUG_DISCOVERY >= 2: print("Started listen task on all interfaces")
except Exception as e:
if DEBUG_DISCOVERY >= 2: print(f"Error setting up listening: {e}")

async def task_cleanup_peers(self):
while True:
Expand Down

0 comments on commit d23ab79

Please sign in to comment.