Skip to content

Patch 1 #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
bf57a32
Renamed several attributes, removed LinkMode
Alebat Nov 24, 2016
f7e5641
Removed generics, used HashSet
Alebat Nov 25, 2016
160cd87
Defaulted muted as false instead of true
Alebat Dec 6, 2016
85dc742
Builder syntax for SensorFlow class
Alebat Dec 15, 2016
12caddc
Protected from nulls the method addLink
Alebat Dec 20, 2016
e83b637
New experimental branch: removed generics from queues for reducing GC
Alebat May 26, 2017
37e8fab
GC fix
Alebat May 29, 2017
80dd990
Refactor: improved class structure
Alebat May 29, 2017
a308bcb
Refactor: improved class structure
Alebat May 29, 2017
c64a432
[T] Hotfix + locality improvement
Alebat May 29, 2017
7292084
X WirelessDevice wip implementation
Alebat May 29, 2017
a3c118d
Docu update
Alebat May 30, 2017
22c9d24
X Processing wip implementation
Alebat Jun 5, 2017
76f8d19
WIP
Alebat Jun 12, 2017
a61545b
Implemented new interfaces till WirelessDevice, solved the problem wi…
Alebat Jun 14, 2017
77b8195
WIP Interfaces, begun implementations
Alebat Jun 20, 2017
4b4af30
WIP interfaces, improved logs
Alebat Jun 21, 2017
8bf60c8
Pettinando
Alebat Jul 18, 2017
e5731a8
Changed to hot scheme changes
Alebat Jul 20, 2017
ce2073d
Changed to hot scheme changes
Alebat Jul 20, 2017
291d5d2
Writing tests
Alebat Jul 20, 2017
d541f8e
Set up some tests and solved some bugs
Alebat Jul 25, 2017
cd4fd0b
Tests, concurrency fixes and refactoring
Alebat Jul 26, 2017
04feaa5
Implemented OutputBuffer
Alebat Jul 26, 2017
108208a
Code rearrangement
Alebat Jul 26, 2017
a6b1754
Fixes, implementation, refactoring, rearrangement of sense package
Alebat Jul 26, 2017
8f62644
Added Unit Tests for most SensorFlow classes
Alebat Jul 27, 2017
c05bfb0
Added KeyValue data support, triggerable and with dictionaries
Alebat Aug 1, 2017
7d3f2ae
Progresses, started converting plugins
Alebat Aug 1, 2017
1e90a62
Refactoring SFPlugins and preparing for tests. Removed some classes
Alebat Aug 2, 2017
0e3f034
[c] Finished refactoring, removed some obsolete classes, fixed CSVTest
Alebat Aug 3, 2017
e6a1328
Redesigned ProtobufferOutput, todo: review concurrency
Alebat Aug 3, 2017
d3665a5
Tests for ProtobufferOutput
Alebat Aug 8, 2017
b3c918f
Ported to Java 7
Alebat Sep 4, 2017
ef2107e
Added connection type
Alebat Sep 15, 2017
20bb290
Working on DeviceDetection
Alebat Sep 26, 2017
a5c1907
Added filtering utility to DeviceDetector
Alebat Sep 27, 2017
bd55456
Fixes, added ConnectionStatus enum
Alebat Sep 28, 2017
cb59ba6
Improvements towards automatic instantiation
Alebat Oct 2, 2017
d9f9040
Added reactivity of Streams, relaxed naming system (no more final)
Alebat Oct 6, 2017
6eda72f
Formalization, TODO update tests
Alebat Oct 12, 2017
c9eb75e
Added log type constants
Alebat Oct 18, 2017
56e91f6
Added support for InputGroups added/removed events
Alebat Oct 19, 2017
bb41661
Isolated status from log, improved log management, made public class …
Alebat Oct 23, 2017
7e0c56b
Typo
Alebat Oct 27, 2017
d6c8c7a
Update README.md
andbiz Nov 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,83 @@
# SensorFlow
A pure Java library for the unified sensor-data streams management.
A framework for the unified sensor-data streams management.
The framework is composed of two blocks:
1. Real-life sensing
2. Data management

**Issues**
1. Heterogeneity: of devices, protocols, cloud solutions and applications
2. Synchronization
3. Security/privacy

**Key paradigm**: Modularity and common interfaces

**Components**:
* BSN Manager (BSNM)
* Input Module
* Output Module
* Processing Module
* User Interface

**Data types**:
* Signals: Series of (timestamp,value) pairs conveying the information from the sensors
* Metadata: Information about the composition of the BSN and characteristics of the collected signals
* Messages: Series of (timestamp, {event}) pairs that describe the status of the network with possible errors. Used by the modules to communicate their current status to the BSNM.

## Components
### BSNM
Manages the BSN through the connected modules.
Main functions:
* Set the BSN and provide a time reference to the input plugins for the synchronization (together with latency estimation)
* Check the status of the BSN
* Start and stop the acquisition

### Input Module
Manages the input protocols and exposes a common interface to the BSNM.
Two distinct functions:
* Interfaces with the device
* Manages the data streams

### Output Module
Manages the output protocols and exposes a common interface to the BSNM according to the application
Two distinct functions:
* Interfaces with the destination
* Manages the security/privacy

### Processing Module
Provides real-time capabilities
Two types:
* (quasi) Real-Time
* Post-acquisition

### User Interface
Exploits the BSNM to allow the user to set the BSN

## Development plan
### API::DJango Rest Server
* DJango admin [Done]
* Authenticate users via token [Done]
* Provide profile info [Done]
* Provide tokens to crossbar for auth
* Browse uploads
* User MRUs
* Browse experiments and sessions
* Browse plugins

### Researcher::WebGUI
* Data management & download
* Session management: experiments and if they have real time feedback
* Plugin management
* Account management

### Experimenter::Mobile
* Authenticates [Done]
* Uploads data (September)
* Receives feedback

### Plugin::Remote Client
* Authenticates
* Receives data through WebSocket/MQTT

### Database::MongoDB Server
* Design schema [Done]
* Define data ("Collection")
15 changes: 6 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
apply plugin: 'java'
//apply plugin: 'maven'
//
//group = 'com.github.MPBA'
//
//
//dependencies {
// targetCompatibility = '1.7'
// sourceCompatibility = '1.7'
//}

dependencies {
targetCompatibility = '1.7'
sourceCompatibility = '1.7'
testCompile 'junit:junit:4.12'
}
243 changes: 243 additions & 0 deletions src/main/java/eu/fbk/mpba/sensorflow/Input.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package eu.fbk.mpba.sensorflow;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class Input implements InputGroup {

// Static time - things

private static long bootTime = System.currentTimeMillis() * 1_000_000L - System.nanoTime();
private static TimeSource time = new TimeSource() {

@Override
public long getMonoUTCNanos() {
return System.nanoTime() + bootTime;
}

@Override
public long getMonoUTCNanos(long systemNanoTime) {
return systemNanoTime + bootTime;
}
};

private static AtomicLong sequential = new AtomicLong(1L);

// Fields

public final int intUid;
private final boolean reactive;
private volatile boolean listened = true;
private String name;

private InputGroup parent;
private List<String> header;
private Set<OutputManager> outputs = new HashSet<>();
private ReentrantReadWriteLock outputsAccess = new ReentrantReadWriteLock(false);
private Map<String, String> dictionary = new HashMap<>();
private ReentrantReadWriteLock dictionaryAccess = new ReentrantReadWriteLock(false);
private long holdTimestamp;
private double[] holdValue;
private long holdTimestampLog;
private String holdValueLog;

// Constructors

protected Input(Collection<String> header) {
this(null, Input.class.getSimpleName(), header, false);
}

protected Input(InputGroup parent, Collection<String> header) {
this(parent, Input.class.getSimpleName(), header,false);
}

protected Input(InputGroup parent, String name, Collection<String> header) {
this(parent, name, header, false);
}

protected Input(InputGroup parent, String name, Collection<String> header, boolean reactive) {
this.reactive = reactive;
long longUid = sequential.getAndIncrement();
this.intUid = (int) longUid / 2 * ((int) longUid % 2 * 2 - 1);
this.parent = parent;
this.name = name != null ? name : getClass().getSimpleName() + "-" + hashCode();
this.header = new ArrayList<>(header);
}

public void setName(String name) {
this.name = name;
}

// Outputs access

void addOutput(OutputManager output) {
outputsAccess.writeLock().lock();
outputs.add(output);
outputsAccess.writeLock().unlock();
pushDictionary(output);
if (listened && reactive) {
if (holdValue != null) // "&& reactive" is implicit
pushValueInner(holdTimestamp, holdValue);
if (holdValueLog != null) // "&& reactive" is implicit
pushLogInner(holdTimestampLog, holdValueLog);
}
}

void removeOutput(OutputManager output) {
outputsAccess.writeLock().lock();
outputs.remove(output);
outputsAccess.writeLock().unlock();
}

void pushDictionary(OutputManager output) {
dictionaryAccess.readLock().lock();
for (Map.Entry<String, String> entry : dictionary.entrySet()) {
output.pushLog(this, getTimeSource().getMonoUTCNanos(), formatKeyValue(entry.getKey(), entry.getValue()));
}
dictionaryAccess.readLock().unlock();
}

Collection<OutputManager> getOutputs() {
outputsAccess.readLock().lock();
ArrayList<OutputManager> outputManagers = new ArrayList<>(outputs);
outputsAccess.readLock().unlock();
return outputManagers;
}

// Outputs access - Notify

private String formatKeyValue(String key, String value) {
String separator = ",";
final String key2 = key.replace("\\", "\\\\").replace(separator, "\\s");
final String value2 = value.replace("\\", "\\\\").replace(separator, "\\s");
return key2 + separator + value2;
}

public void putKeyValue(String key, String value) {
dictionaryAccess.writeLock().lock();
String old = dictionary.put(key, value);
dictionaryAccess.writeLock().unlock();
if (listened && !value.equals(old)) {
pushLog(getTimeSource().getMonoUTCNanos(), formatKeyValue(key, value));
}
}

public void pushValue(long time, double value) {
pushValue(time, new double[] { value });
}

public void pushValue(long time, double[] value) {
// Shouldn't be called before onCreateAndAdded
if (listened) {
pushValueInner(time, value);
}
if (reactive) {
holdTimestamp = time;
holdValue = value;
}
}

public void pushLog(long time, String message) {
// Shouldn't be called before onCreateAndAdded
if (listened) {
pushLogInner(time, message);
}
if (reactive) {
holdTimestampLog = time;
holdValueLog = message;
}
}

private void pushValueInner(long time, double[] value) {
outputsAccess.readLock().lock();
for (OutputManager output : outputs)
if (output.isEnabled())
output.pushValue(this, time, value);
outputsAccess.readLock().unlock();
}

private void pushLogInner(long time, String message) {
outputsAccess.readLock().lock();
for (OutputManager output : outputs)
if (output.isEnabled())
output.pushLog(this, time, message);
outputsAccess.readLock().unlock();
}

// Muting

public boolean isMuted() {
return !listened;
}

public void mute() {
this.listened = false;
}

public void unmute() {
this.listened = true;
if (reactive) {
pushValueInner(holdTimestamp, holdValue);
}
}

// Gets

public boolean isReactive() {
return reactive;
}

public static TimeSource getTimeSource() {
return time;
}

public InputGroup getParent() {
return parent;
}

public final List<String> getHeader(){
return header;
}

// Gets - SFPlugin non-final Overrides

@Override
public final String getName() {
InputGroup parent = getParent();
return parent != null ? parent.getName() + "/" + getSimpleName() : getSimpleName();
}

// Gets - InputGroup final Overrides

@Override
public final String getSimpleName() {
return name;
}

@Override
public final Collection<Input> getChildren() {
return Collections.singletonList(this);
}

// Finalization

public void close() {
outputsAccess.writeLock().lock();
outputs.clear();
outputsAccess.writeLock().unlock();
}

@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
}
27 changes: 27 additions & 0 deletions src/main/java/eu/fbk/mpba/sensorflow/InputGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package eu.fbk.mpba.sensorflow;

import java.util.Collection;

public interface InputGroup extends SFPlugin {

/**
* Called when the input plugin is added to SensorFlow but not wired yet. This is the place
* where to finalize the setup of the Streams.
*/
void onCreate();

/**
* Called after routing ("wiring") the plugin to all the outputs.
*/
void onAdded();

/**
* Called when the plugin is removed from SensorFlow
*/
void onRemoved();

Collection<Input> getChildren();

String getSimpleName();

}
Loading