-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdhcp_thread.py
292 lines (261 loc) · 12.8 KB
/
dhcp_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# -*- coding: utf-8 -*-
##
# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
# This file is part of openmano
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: [email protected]
##
'''
This is thread that interact with the dhcp server to get the IP addresses
'''
__author__="Pablo Montes, Alfonso Tierno"
__date__ ="$4-Jan-2016 12:07:15$"
import threading
import time
import Queue
import paramiko
import random
import subprocess
#TODO: insert a logging system
class dhcp_thread(threading.Thread):
def __init__(self, dhcp_params, db, db_lock, test, dhcp_nets, debug=None):
'''Init a thread.
Arguments: thread_info must be a dictionary with:
'dhcp_params' dhcp server parameters with the following keys:
mandatory : user, host, port, key, ifaces(interface name list of the one managed by the dhcp)
optional: password, key, port(22)
'db' 'db_lock': database class and lock for accessing it
'test': in test mode no acces to a server is done, and ip is invented
'''
threading.Thread.__init__(self)
self.name = "dhcp_thread"
self.dhcp_params = dhcp_params
self.debug = debug
self.db = db
self.db_lock = db_lock
self.test = test
self.dhcp_nets = dhcp_nets
self.ssh_conn = None
self.mac_status ={} #dictionary of mac_address to retrieve information
#ip: None
#retries:
#next_reading: time for the next trying to check ACTIVE status or IP
#created: time when it was added
#active: time when the VM becomes into ACTIVE status
self.queueLock = threading.Lock()
self.taskQueue = Queue.Queue(2000)
def ssh_connect(self):
try:
#Connect SSH
self.ssh_conn = paramiko.SSHClient()
self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_conn.load_system_host_keys()
self.ssh_conn.connect(self.dhcp_params["host"], port=self.dhcp_params.get("port",22),
username=self.dhcp_params["user"], password=self.dhcp_params.get("password"), key_filename=self.dhcp_params.get("key"),
timeout=2)
except paramiko.ssh_exception.SSHException as e:
text = e.args[0]
print self.name, ": ssh_connect ssh Exception:", text
def load_mac_from_db(self):
#TODO get macs to follow from the database
print self.name, " load macs from db"
self.db_lock.acquire()
r,c = self.db.get_table(SELECT=('mac','ip_address','nets.uuid as net_id', ),
FROM='ports join nets on ports.net_id=nets.uuid',
WHERE_NOT={'ports.instance_id': None, 'nets.provider': None})
self.db_lock.release()
now = time.time()
self.mac_status ={}
if r<0:
print self.name, ": Error getting data from database:", c
return
for port in c:
if port["net_id"] in self.dhcp_nets:
self.mac_status[ port["mac"] ] = {"ip": port["ip_address"], "next_reading": now, "created": now, "retries":0}
def insert_task(self, task, *aditional):
try:
self.queueLock.acquire()
task = self.taskQueue.put( (task,) + aditional, timeout=5)
self.queueLock.release()
return 1, None
except Queue.Full:
return -1, "timeout inserting a task over host " + self.name
def run(self):
print self.name, " starting, nets", self.dhcp_nets
next_iteration = time.time() + 10
while True:
self.load_mac_from_db()
while True:
self.queueLock.acquire()
if not self.taskQueue.empty():
task = self.taskQueue.get()
else:
task = None
self.queueLock.release()
if task is None:
now=time.time()
if now >= next_iteration:
next_iteration = self.get_ip_from_dhcp()
else:
time.sleep(1)
continue
if task[0] == 'add':
print self.name, ": processing task add mac", task[1]
now=time.time()
self.mac_status[task[1] ] = {"ip": None, "next_reading": now, "created": now, "retries":0}
next_iteration = now
elif task[0] == 'del':
print self.name, ": processing task del mac", task[1]
if task[1] in self.mac_status:
del self.mac_status[task[1] ]
elif task[0] == 'exit':
print self.name, ": processing task exit"
self.terminate()
return 0
else:
print self.name, ": unknown task", task
def terminate(self):
try:
if self.ssh_conn:
self.ssh_conn.close()
except Exception as e:
text = str(e)
print self.name, ": terminate Exception:", text
print self.name, ": exit from host_thread"
def get_ip_from_dhcp(self):
now = time.time()
next_iteration= now + 40000 # >10 hores
#print self.name, "Iteration"
for mac_address in self.mac_status:
if now < self.mac_status[mac_address]["next_reading"]:
if self.mac_status[mac_address]["next_reading"] < next_iteration:
next_iteration = self.mac_status[mac_address]["next_reading"]
continue
if self.mac_status[mac_address].get("active") == None:
#check from db if already active
self.db_lock.acquire()
r,c = self.db.get_table(FROM="ports as p join instances as i on p.instance_id=i.uuid",
WHERE={"p.mac": mac_address, "i.status": "ACTIVE"})
self.db_lock.release()
if r>0:
self.mac_status[mac_address]["active"] = now
self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
print self.name, "mac %s VM ACTIVE" % (mac_address)
self.mac_status[mac_address]["retries"] = 0
else:
#print self.name, "mac %s VM INACTIVE" % (mac_address)
if now - self.mac_status[mac_address]["created"] > 300:
#modify Database to tell openmano that we can not get dhcp from the machine
if not self.mac_status[mac_address].get("ip"):
self.db_lock.acquire()
r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
self.db_lock.release()
self.mac_status[mac_address]["ip"] = "0.0.0.0"
print self.name, "mac %s >> set to 0.0.0.0 because of timeout" % (mac_address)
self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
else:
self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
if self.mac_status[mac_address]["next_reading"] < next_iteration:
next_iteration = self.mac_status[mac_address]["next_reading"]
continue
if self.test:
if self.mac_status[mac_address]["retries"]>random.randint(10,100): #wait between 10 and 100 seconds to produce a fake IP
content = self.get_fake_ip()
else:
content = None
elif self.dhcp_params["host"]=="localhost":
try:
command = ['get_dhcp_lease.sh', mac_address]
content = subprocess.check_output(command)
except Exception as e:
text = str(e)
print self.name, ": get_ip_from_dhcp subprocess Exception", text
content = None
else:
try:
if not self.ssh_conn:
self.ssh_connect()
command = 'get_dhcp_lease.sh ' + mac_address
(_, stdout, _) = self.ssh_conn.exec_command(command)
content = stdout.read()
except paramiko.ssh_exception.SSHException as e:
text = e.args[0]
print self.name, ": get_ip_from_dhcp: ssh_Exception:", text
content = None
self.ssh_conn = None
except Exception as e:
text = str(e)
print self.name, ": get_ip_from_dhcp: Exception:", text
content = None
self.ssh_conn = None
if content:
self.mac_status[mac_address]["ip"] = content
#modify Database
self.db_lock.acquire()
r,c = self.db.update_rows("ports", {"ip_address": content}, {"mac": mac_address})
self.db_lock.release()
if r<0:
print self.name, ": Database update error:", c
else:
self.mac_status[mac_address]["retries"] = 0
self.mac_status[mac_address]["next_reading"] = (int(now)/3600 +1)* 36000 # 10 hores
if self.mac_status[mac_address]["next_reading"] < next_iteration:
next_iteration = self.mac_status[mac_address]["next_reading"]
print self.name, "mac %s >> %s" % (mac_address, content)
continue
#a fail has happen
self.mac_status[mac_address]["retries"] +=1
#next iteration is every 2sec at the beginning; every 5sec after a minute, every 1min after a 5min
if now - self.mac_status[mac_address]["active"] > 120:
#modify Database to tell openmano that we can not get dhcp from the machine
if not self.mac_status[mac_address].get("ip"):
self.db_lock.acquire()
r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
self.db_lock.release()
self.mac_status[mac_address]["ip"] = "0.0.0.0"
print self.name, "mac %s >> set to 0.0.0.0 because of timeout" % (mac_address)
if now - self.mac_status[mac_address]["active"] > 60:
self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
elif now - self.mac_status[mac_address]["active"] > 300:
self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
else:
self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
if self.mac_status[mac_address]["next_reading"] < next_iteration:
next_iteration = self.mac_status[mac_address]["next_reading"]
return next_iteration
def get_fake_ip(self):
fake_ip= "192.168.%d.%d" % (random.randint(1,254), random.randint(1,254) )
while True:
#check not already provided
already_used = False
for mac_address in self.mac_status:
if self.mac_status[mac_address]["ip"] == fake_ip:
already_used = True
break
if not already_used:
return fake_ip
#EXAMPLE of bash script that must be available at the DHCP server for "isc-dhcp-server" type
# $ cat ./get_dhcp_lease.sh
# #!/bin/bash
# awk '
# ($1=="lease" && $3=="{"){ lease=$2; active="no"; found="no" }
# ($1=="binding" && $2=="state" && $3=="active;"){ active="yes" }
# ($1=="hardware" && $2=="ethernet" && $3==tolower("'$1';")){ found="yes" }
# ($1=="client-hostname"){ name=$2 }
# ($1=="}"){ if (active=="yes" && found=="yes"){ target_lease=lease; target_name=name}}
# END{printf("%s", target_lease)} #print target_name
# ' /var/lib/dhcp/dhcpd.leases