Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 70 additions & 43 deletions src/train.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/local/bin/python3

# -*- coding: utf-8 -*-
"""
Created on Fri May 24 14:43:25 2019
Expand Down Expand Up @@ -34,6 +36,11 @@
import keras
import tensorflow as tf

from apscheduler.schedulers.blocking import BlockingScheduler

API_KEY = os.environ.get('API_KEY')
# BASE_URL = "http://localhost:8080"
BASE_URL = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService"

# system specific values, as our batch size is fairly small cpu should be faster.
# change these to fit your system
Expand All @@ -46,8 +53,7 @@ def getSensorList():
'''
get a list with all sensors that can be used
'''
response = requests.get(
"http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/sensors", timeout=10)
response = requests.get(BASE_URL + "/api/measurements/sensors", timeout=10)
data = response.json()

sensorList = []
Expand All @@ -64,7 +70,7 @@ def getDataFromSensor(sensorID, timestamp):
'''
data = []
fullData = []
fullUrl = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/bySensorUntilNow?sensor=" + \
fullUrl = BASE_URL + "/api/measurements/bySensorUntilNow?sensor=" + \
sensorID + "&timestamp=" + str(timestamp)

try:
Expand All @@ -80,7 +86,7 @@ def getDataFromSensor(sensorID, timestamp):
measurement = singleResult['measurement']
weatherReport = singleResult['weatherReport']

if (weatherReport is None):
if (weatherReport is None):
# use the last available weatherReport instead
weatherReport = lastWeatherReport

Expand Down Expand Up @@ -174,6 +180,7 @@ def inAndOutput(training_set):
for i in range(40, training_set.shape[0]-5):
X.append(training_set[i-40:i, :])
y.append(training_set[i:i+5, :2])

X, y = np.array(X), np.array(y)
# Reshaping
y = np.reshape(y, (y.shape[0], y.shape[1]*2))
Expand Down Expand Up @@ -215,12 +222,11 @@ def loadModel(jsonpath, h5path):
'''
loading the model with a path
'''
json_file = open(jsonpath, 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
loaded_model.load_weights(h5path)
return loaded_model
with open(jsonpath, 'r') as json_file:
loaded_model_json = json_file.read()
loaded_model = model_from_json(loaded_model_json)
loaded_model.load_weights(h5path)
return loaded_model


def furtherTraining(X_train, y_train, regressor):
Expand Down Expand Up @@ -277,8 +283,8 @@ def isContinuous(sensorID, timestamp):
check if a sensor is continuous
'''

fullUrl = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/bySensor?sensor=" + \
str(sensorID) + "&timestamp="+str(latestTimestamp)
fullUrl = BASE_URL + "/api/measurements/bySensor?sensor=" + \
str(sensorID) + "&timestamp="+str(timestamp)

data = {}

Expand Down Expand Up @@ -343,37 +349,58 @@ def plot2(plot, plot2, name):
plt.show()


# giving the prediction to the backend
key = os.environ.get('API_KEY')
if (key == None):
if (API_KEY == None):
print("No key specified, printing predictions to console only.")

sensorList = getSensorList()
latestTimestamp = int(time.time())
model = loadModel("regressor.json", "model.h5")

for i, sensorId in enumerate(sensorList):
# check if process has been running for more than 50 minutes (= 3000 sec)
if (int(time.time()) - latestTimestamp > 3000):
print("Stopping execution because process took too long.")
break

print(str(i) + " / " + str(len(sensorList)))
result = predictionGiver(sensorId, latestTimestamp, model)
if (type(result) != type(None)):
if (key == None):
print(result)

def createPredictions():
'''
Fetches all available sensor and attempts
to create a prediction for each sensor
after checking whether the sensor is continuous.
'''
sensorList = getSensorList()
latestTimestamp = int(time.time())
model = loadModel("regressor.json", "model.h5")

for i, sensorId in enumerate(sensorList):
# check if process has been running for more than 50 minutes (= 3000 sec)
if (int(time.time()) - latestTimestamp > 3000):
print("Stopping execution because process took too long.")
break

print(str(i) + " / " + str(len(sensorList)))
result = predictionGiver(sensorId, latestTimestamp, model)
if (type(result) != type(None)):
if (API_KEY == None):
print(result)
else:
try:
requests.post(BASE_URL + "/api/measurements/updatePredictions",
timeout=5,
json={"startTime": latestTimestamp,
"sensor": sensorId,
"values": result.tolist(),
"apiKey": API_KEY})
except (requests.exceptions.ReadTimeout, socket.timeout, requests.exceptions.ConnectTimeout) as e:
print("Prediction upload took too long " +
sensorId + " " + str(latestTimestamp))
print(e)
else:
try:
requests.post('http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/updatePredictions',
timeout=5,
json={"startTime": latestTimestamp,
"sensor": sensorId,
"values": result.tolist(),
"apiKey": key})
except (requests.exceptions.ReadTimeout, socket.timeout, requests.exceptions.ConnectTimeout) as e:
print("Prediction upload took too long " +
sensorId + " " + str(latestTimestamp))
print(e)
else:
print(sensorId + " is not continuous!")
print(sensorId + " is not continuous!")


if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(createPredictions,
trigger='cron',
minute='5',
hour='*/1',
max_instances=1)

print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass