Skip to content

Commit

Permalink
Adding some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ruwany committed Aug 12, 2015
1 parent e93547e commit 1dfd3a6
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void run(){
try {
mqttReciever.subscribe();
} catch (Exception e) {
log.error("Error when invoking MQTT");
log.error("Error while subscribing to MQTT Queue");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.wso2.devicemgt.raspberry.agent;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
Expand All @@ -16,17 +18,23 @@
*/
public class PushBamData {

private static final Log log = LogFactory.getLog(PushBamData.class);

private static DataPublisher dataPublisher = null;
String streamId = null;

final AgentConstants constants = new AgentConstants();


/**
* Declare the stream
* @return
*/
public boolean initializeDataPublisher(){

try {

setTrustStoreParams();
log.info("Initializing BAM data publisher.");
dataPublisher = new DataPublisher(constants.prop.getProperty("bam.thrift.url"), constants.prop.getProperty("bam.username"), constants.prop.getProperty("bam.password"));

streamId = dataPublisher.defineStream("{" +
Expand All @@ -47,19 +55,19 @@ public boolean initializeDataPublisher(){
return true;

} catch (AgentException e) {
e.printStackTrace();
log.error("Error in agent : "+e.getMessage());
} catch (MalformedStreamDefinitionException e) {
e.printStackTrace();
log.error("Malformed stream definition : "+e.getMessage());
} catch (StreamDefinitionException e) {
e.printStackTrace();
log.error("Error in stream definition : "+e.getMessage());
} catch (DifferentStreamDefinitionAlreadyDefinedException e) {
e.printStackTrace();
log.error("Duplicate stream definition : "+e.getMessage());
} catch (MalformedURLException e) {
e.printStackTrace();
log.error("Malformed URL : "+e.getMessage());
} catch (AuthenticationException e) {
e.printStackTrace();
log.error("Error in authentication : "+e.getMessage());
} catch (TransportException e) {
e.printStackTrace();
log.error("Error in transport : "+e.getMessage());
}

return false;
Expand All @@ -76,10 +84,18 @@ public void setTrustStoreParams() {

}

/**
* Push data to BAM
* @param deviceId
* @param type
* @param owner
* @param event
* @return
*/
public boolean publishData(String deviceId, String type, String owner, String event){
try {

System.out.println("########### "+event);
log.debug("DeviceID : "+deviceId+", Type : "+type+", Owner: "+owner+", Event : "+event);
dataPublisher.publish(streamId, new Object[]{deviceId}, null, new Object[]{type, owner, event});
// dataPublisher.stop();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,69 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;


/**
* This class reads the sonar reading and injects values
* to the siddhiEngine for processing on a routine basis
* also if the siddhiquery is updated the class takes
* care of re-initializing same.
*/
public class SidhdhiQuery implements Runnable {
private static final Logger log = Logger.getLogger(SidhdhiQuery.class);
final AgentConstants constants = new AgentConstants();

boolean isBulbOn = false;
PushBamData pushBamData = null;
//Bam data push client
private static PushBamData pushBamData = new PushBamData();
private static SiddhiManager siddhiManager = new SiddhiManager();

public static PushBamData getPushBamData() {
return pushBamData;
}

public static SiddhiManager getSiddhiManager() {
return siddhiManager;
}

public static void setSiddhiManager(SiddhiManager siddhiManager) {
SidhdhiQuery.siddhiManager = siddhiManager;
}

//keeps track of bulb status. The start status is assumed as off
//TODO : pick up current bulb status from a API
boolean isBulbOn = false;

public void run() {
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
pushBamData = new PushBamData();
pushBamData.initializeDataPublisher();

//Initialize Push data client
PushBamData pushdata = getPushBamData();
pushdata.initializeDataPublisher();

//Start the execution plan with pre-defined or previously persisted Siddhi query
StartExecutionPlan startExecutionPlan = new StartExecutionPlan(siddhiManager).invoke();
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();

while (true) {

//Check if there is new policy update available
if(AgentInitializer.isUpdated()) {
if (AgentInitializer.isUpdated()) {
System.out.print("### Policy Update Detected!");
//Restart execution plan with new query
startExecutionPlan = new StartExecutionPlan(siddhiManager).invoke();
restartSiddhi();
startExecutionPlan = new StartExecutionPlan().invoke();
}
InputHandler inputHandler = startExecutionPlan.getInputHandler();

//Sending events to Siddhi
try {
//If sonar URL is present in the config file
//If sonar URL is present in the config file the program will read stats off the API
//If not it will look for a file which is also configurable via a property
String sonarUrl = constants.prop.getProperty("sonar.reading.url");
String sonarReading = null;
if(sonarUrl!=null) {
if (sonarUrl != null) {
sonarReading = readSonarData(sonarUrl);
}
if(sonarReading==null || sonarReading.equalsIgnoreCase("")) {
if (sonarReading == null || sonarReading.equalsIgnoreCase("")) {
sonarReading = readFile(constants.prop.getProperty("sonar.reading.file.path"), StandardCharsets.UTF_8);
}
System.out.println("Pushing data to CEP - Sonar : "+sonarReading.trim());
log.info("Pushing data to CEP - Sonar : " + sonarReading.trim());
inputHandler.send(new Object[]{"FIRE_1", Double.parseDouble(sonarReading)});
Thread.sleep(Integer.parseInt(constants.prop.getProperty("read.interval")));
// executionPlanRuntime.shutdown();
Expand All @@ -90,18 +113,31 @@ public void run() {
}
}

/**
* Re-Initialize SiddhiManager
*/
private void restartSiddhi() {
siddhiManager.shutdown();
siddhiManager = new SiddhiManager();
}

/**
* Make http call to specified endpoint with events
* @param inEvents
* @param bulbEP
* @param logText
*/
private void performHTTPCall(Event[] inEvents, String bulbEP, String logText) {
if (inEvents != null && inEvents.length > 0) {
EventPrinter.print(inEvents);
String url = constants.prop.getProperty(bulbEP);

CloseableHttpAsyncClient httpclient = null;
try {

httpclient = HttpAsyncClients.createDefault();
httpclient.start();
HttpGet request = new HttpGet(url);
System.out.println("Bulb Status : "+logText);
log.info("Bulb Status : " + logText);
final CountDownLatch latch = new CountDownLatch(1);
Future<HttpResponse> future = httpclient.execute(
request, new FutureCallback<HttpResponse>() {
Expand All @@ -119,19 +155,24 @@ public void failed(Exception e) {
public void cancelled() {
latch.countDown();
}
});
}
);

try {
latch.await();

} catch (InterruptedException e) {
if (log.isDebugEnabled()) {
log.debug("Sync Interrupted");
}
e.printStackTrace();
}
}
}

static String readFile(String path, Charset encoding){
/**
* Read content from a given file and return as a string
* @param path
* @param encoding
* @return
*/
static String readFile(String path, Charset encoding) {
byte[] encoded = new byte[0];
try {
encoded = Files.readAllBytes(Paths.get(path));
Expand All @@ -141,34 +182,38 @@ static String readFile(String path, Charset encoding){
return new String(encoded, encoding);
}

private String readSonarData(String sonarAPIUrl){

/**
* Read sonar data from API URL
* @param sonarAPIUrl
* @return
*/
private String readSonarData(String sonarAPIUrl) {
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(sonarAPIUrl);
String responseStr = null;
try {
HttpResponse response = client.execute(request);
log.debug("Response Code : " + response);
InputStream input = response.getEntity().getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(input));
BufferedReader br = new BufferedReader(new InputStreamReader(input, "UTF-8"));
responseStr = String.valueOf(br.readLine());
br.close();

} catch (IOException e) {
//log.error("Exception encountered while trying to make get request.");
System.out.print("ERROR!!!");
log.error("Error while reading sonar reading from file!");
return responseStr;
}
return responseStr;
}


/**
* Initialize SiddhiExecution plan
*/
private class StartExecutionPlan {
private SiddhiManager siddhiManager;
private InputHandler inputHandler;

public StartExecutionPlan(SiddhiManager siddhiManager) {
this.siddhiManager = siddhiManager;
}

public InputHandler getInputHandler() {
return inputHandler;
}
Expand All @@ -184,8 +229,8 @@ public StartExecutionPlan invoke() {
@Override
public void receive(Event[] events) {
System.out.println("Bulb on Event Fired!");
if(events.length > 0) {
if(!isBulbOn){
if (events.length > 0) {
if (!isBulbOn) {
performHTTPCall(events, "bulb.on.api.endpoint", "Bulb Switched on!");
System.out.println("#### Performed HTTP call! ON.");
pushBamData.publishData(constants.prop.getProperty("device.id"),
Expand All @@ -202,8 +247,8 @@ public void receive(Event[] events) {
@Override
public void receive(Event[] inEvents) {
System.out.println("Bulb off Event Fired");
if(isBulbOn){
performHTTPCall(inEvents,"bulb.off.api.endpoint","Bulb Switched off!");
if (isBulbOn) {
performHTTPCall(inEvents, "bulb.off.api.endpoint", "Bulb Switched off!");
System.out.println("#### Performed HTTP call! OFF.");
pushBamData.publishData(constants.prop.getProperty("device.id"),
constants.prop.getProperty("device.type"),
Expand Down
17 changes: 17 additions & 0 deletions RaspberryCEPAgent/FireAlarm/utils/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
log4j.rootLogger=debug, stdout, R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=example.log

log4j.appender.R.MaxFileSize=100KB
# Keep one backup file
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n

0 comments on commit 1dfd3a6

Please sign in to comment.