diff --git a/src/main/java/com/pmeade/websocket/example/EchoServerSync.java b/src/main/java/com/pmeade/websocket/example/EchoServerSync.java new file mode 100644 index 0000000..e1eae5f --- /dev/null +++ b/src/main/java/com/pmeade/websocket/example/EchoServerSync.java @@ -0,0 +1,54 @@ +package com.pmeade.websocket.example; + +import com.pmeade.websocket.io.WebSocketServerOutputStream; +import com.pmeade.websocket.net.WebSocket; +import com.pmeade.websocket.net.WebSocketServerSocket; +import com.pmeade.websocket.example.WebSocketConsumerThread; +import com.pmeade.websocket.example.StringMessageQueue; +import com.pmeade.websocket.example.WebSocketThread; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Queue; +import java.util.Arrays; + +/** + * @author pmeade + */ +public class EchoServerSync { + public static final int PORT = 8080; + + public static void main(String[] args) { + EchoServerSync echoServer = new EchoServerSync(); + try { + echoServer.doIt(); + } catch(Exception e) { + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } + } + + public void doIt() throws Exception + { + ServerSocket serverSocket = new ServerSocket(PORT); + WebSocketServerSocket webSocketServerSocket + = new WebSocketServerSocket(serverSocket); + StringMessageQueue messageQueue = new StringMessageQueue(); + LinkedList connections = new LinkedList(); + new WebSocketConsumerThread(messageQueue, connections).start(); + while(finished == false) { + WebSocket socket = webSocketServerSocket.accept(); + connections.add(socket); + new WebSocketThread(socket, messageQueue).start(); + } + } + + public void finish() { + finished = true; + } + + private boolean finished = false; +} diff --git a/src/main/java/com/pmeade/websocket/example/StringMessageQueue.java b/src/main/java/com/pmeade/websocket/example/StringMessageQueue.java new file mode 100644 index 0000000..373eabb --- /dev/null +++ b/src/main/java/com/pmeade/websocket/example/StringMessageQueue.java @@ -0,0 +1,24 @@ +package com.pmeade.websocket.example; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * message queue + * keeps received messages + */ +public class StringMessageQueue { + private Queue q = new LinkedList(); + synchronized String pop() throws InterruptedException { + while(q.isEmpty()) { + wait(); + } + String value = q.remove(); + notifyAll(); + return value; + } + synchronized void push(String message) throws InterruptedException { + q.add(message); + notifyAll(); + } +} diff --git a/src/main/java/com/pmeade/websocket/example/WebSocketConsumerThread.java b/src/main/java/com/pmeade/websocket/example/WebSocketConsumerThread.java new file mode 100644 index 0000000..49bbbd7 --- /dev/null +++ b/src/main/java/com/pmeade/websocket/example/WebSocketConsumerThread.java @@ -0,0 +1,51 @@ +package com.pmeade.websocket.example; + +import com.pmeade.websocket.io.WebSocketServerOutputStream; +import com.pmeade.websocket.net.WebSocket; +import com.pmeade.websocket.net.WebSocketServerSocket; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Queue; +import java.util.Arrays; +/** + * web socket consumer thread + * takes messages from message queue and sends them to clients + */ +public class WebSocketConsumerThread extends Thread { + public WebSocketConsumerThread(StringMessageQueue messageQueue, LinkedList connections) { + this.messageQueue = messageQueue; + this.connections = connections; + } + public void run() { + String message = ""; + WebSocket webSocket = null; + WebSocketServerOutputStream wsos = null; + while(!finished) { + try { + message = messageQueue.pop(); + ListIterator listIterator = connections.listIterator(); + while(listIterator.hasNext()) { + webSocket = listIterator.next(); + wsos = webSocket.getOutputStream(); + wsos.writeString(message); + } + } catch(IOException e) { + finished = true; + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } + catch(InterruptedException e) { + finished = true; + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } + } + } + private StringMessageQueue messageQueue; + private LinkedList connections; + private boolean finished = false; +} diff --git a/src/main/java/com/pmeade/websocket/example/WebSocketThread.java b/src/main/java/com/pmeade/websocket/example/WebSocketThread.java new file mode 100644 index 0000000..5ee05e1 --- /dev/null +++ b/src/main/java/com/pmeade/websocket/example/WebSocketThread.java @@ -0,0 +1,65 @@ +package com.pmeade.websocket.example; + +import com.pmeade.websocket.io.WebSocketServerOutputStream; +import com.pmeade.websocket.net.WebSocket; +import com.pmeade.websocket.net.WebSocketServerSocket; +import com.pmeade.websocket.example.WebSocketConsumerThread; +import com.pmeade.websocket.example.StringMessageQueue; +import com.pmeade.websocket.example.ByteAccumulator; +import com.pmeade.websocket.io.LineInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Queue; +import java.util.Arrays; + +/** + * web socket producer thread + * accepts content from web sockets and places them in the queue + */ +public class WebSocketThread extends Thread { + public WebSocketThread(WebSocket socket, StringMessageQueue messageQueue) { + this.webSocket = socket; + this.messageQueue = messageQueue; + } + + @Override + public void run() { + try { + InputStream wsis = webSocket.getInputStream(); + LineInputStream line = new LineInputStream(wsis); + String lineStr = ""; + while (finished == false) { + lineStr = line.readLine(); + messageQueue.push(lineStr); + } + } catch (IOException e) { + finished = true; + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } catch(InterruptedException e) { + finished = true; + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } + try { + webSocket.close(); + } catch (IOException e) { + finished = true; + System.err.println(e.getLocalizedMessage()); + e.printStackTrace(System.err); + } + } + + public void finish() { + finished = true; + } + + private boolean finished = false; + + private final WebSocket webSocket; + private StringMessageQueue messageQueue; +} diff --git a/start-echo-server-sync b/start-echo-server-sync new file mode 100755 index 0000000..5878cdd --- /dev/null +++ b/start-echo-server-sync @@ -0,0 +1,21 @@ +#!/bin/bash +# start-echo-server-sync +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +#---------------------------------------------------------------------------- + +java -cp "$HOME/java_apps/websocket/target/classes:$HOME/java_apps/websocket/target/dependency/*" com.pmeade.websocket.example.EchoServerSync + +#---------------------------------------------------------------------------- +# end of start-echo-server