diff --git a/src/train.py b/src/train.py old mode 100644 new mode 100755 index 1c084d3..ae7dd5c --- a/src/train.py +++ b/src/train.py @@ -1,3 +1,5 @@ +#!/usr/local/bin/python3 + # -*- coding: utf-8 -*- """ Created on Fri May 24 14:43:25 2019 @@ -34,6 +36,11 @@ import keras import tensorflow as tf +from apscheduler.schedulers.blocking import BlockingScheduler + +API_KEY = os.environ.get('API_KEY') +# BASE_URL = "http://localhost:8080" +BASE_URL = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService" # system specific values, as our batch size is fairly small cpu should be faster. # change these to fit your system @@ -46,8 +53,7 @@ def getSensorList(): ''' get a list with all sensors that can be used ''' - response = requests.get( - "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/sensors", timeout=10) + response = requests.get(BASE_URL + "/api/measurements/sensors", timeout=10) data = response.json() sensorList = [] @@ -64,7 +70,7 @@ def getDataFromSensor(sensorID, timestamp): ''' data = [] fullData = [] - fullUrl = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/bySensorUntilNow?sensor=" + \ + fullUrl = BASE_URL + "/api/measurements/bySensorUntilNow?sensor=" + \ sensorID + "×tamp=" + str(timestamp) try: @@ -80,7 +86,7 @@ def getDataFromSensor(sensorID, timestamp): measurement = singleResult['measurement'] weatherReport = singleResult['weatherReport'] - if (weatherReport is None): + if (weatherReport is None): # use the last available weatherReport instead weatherReport = lastWeatherReport @@ -174,6 +180,7 @@ def inAndOutput(training_set): for i in range(40, training_set.shape[0]-5): X.append(training_set[i-40:i, :]) y.append(training_set[i:i+5, :2]) + X, y = np.array(X), np.array(y) # Reshaping y = np.reshape(y, (y.shape[0], y.shape[1]*2)) @@ -215,12 +222,11 @@ def loadModel(jsonpath, h5path): ''' loading the model with a path ''' - json_file = open(jsonpath, 'r') - loaded_model_json = json_file.read() - json_file.close() - loaded_model = model_from_json(loaded_model_json) - loaded_model.load_weights(h5path) - return loaded_model + with open(jsonpath, 'r') as json_file: + loaded_model_json = json_file.read() + loaded_model = model_from_json(loaded_model_json) + loaded_model.load_weights(h5path) + return loaded_model def furtherTraining(X_train, y_train, regressor): @@ -277,8 +283,8 @@ def isContinuous(sensorID, timestamp): check if a sensor is continuous ''' - fullUrl = "http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/bySensor?sensor=" + \ - str(sensorID) + "×tamp="+str(latestTimestamp) + fullUrl = BASE_URL + "/api/measurements/bySensor?sensor=" + \ + str(sensorID) + "×tamp="+str(timestamp) data = {} @@ -343,37 +349,58 @@ def plot2(plot, plot2, name): plt.show() -# giving the prediction to the backend -key = os.environ.get('API_KEY') -if (key == None): +if (API_KEY == None): print("No key specified, printing predictions to console only.") -sensorList = getSensorList() -latestTimestamp = int(time.time()) -model = loadModel("regressor.json", "model.h5") - -for i, sensorId in enumerate(sensorList): - # check if process has been running for more than 50 minutes (= 3000 sec) - if (int(time.time()) - latestTimestamp > 3000): - print("Stopping execution because process took too long.") - break - - print(str(i) + " / " + str(len(sensorList))) - result = predictionGiver(sensorId, latestTimestamp, model) - if (type(result) != type(None)): - if (key == None): - print(result) + +def createPredictions(): + ''' + Fetches all available sensor and attempts + to create a prediction for each sensor + after checking whether the sensor is continuous. + ''' + sensorList = getSensorList() + latestTimestamp = int(time.time()) + model = loadModel("regressor.json", "model.h5") + + for i, sensorId in enumerate(sensorList): + # check if process has been running for more than 50 minutes (= 3000 sec) + if (int(time.time()) - latestTimestamp > 3000): + print("Stopping execution because process took too long.") + break + + print(str(i) + " / " + str(len(sensorList))) + result = predictionGiver(sensorId, latestTimestamp, model) + if (type(result) != type(None)): + if (API_KEY == None): + print(result) + else: + try: + requests.post(BASE_URL + "/api/measurements/updatePredictions", + timeout=5, + json={"startTime": latestTimestamp, + "sensor": sensorId, + "values": result.tolist(), + "apiKey": API_KEY}) + except (requests.exceptions.ReadTimeout, socket.timeout, requests.exceptions.ConnectTimeout) as e: + print("Prediction upload took too long " + + sensorId + " " + str(latestTimestamp)) + print(e) else: - try: - requests.post('http://basecamp-demos.informatik.uni-hamburg.de:8080/AirDataBackendService/api/measurements/updatePredictions', - timeout=5, - json={"startTime": latestTimestamp, - "sensor": sensorId, - "values": result.tolist(), - "apiKey": key}) - except (requests.exceptions.ReadTimeout, socket.timeout, requests.exceptions.ConnectTimeout) as e: - print("Prediction upload took too long " + - sensorId + " " + str(latestTimestamp)) - print(e) - else: - print(sensorId + " is not continuous!") + print(sensorId + " is not continuous!") + + +if __name__ == '__main__': + scheduler = BlockingScheduler() + scheduler.add_job(createPredictions, + trigger='cron', + minute='5', + hour='*/1', + max_instances=1) + + print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) + + try: + scheduler.start() + except (KeyboardInterrupt, SystemExit): + pass