Skip to content

Commit a00c1bd

Browse files
committed
Initial commit
0 parents  commit a00c1bd

15 files changed

+856
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/vendor
2+
/composer.lock
3+
/test/ab/reports

README.md

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
RxWebsocket is a PHP Websocket library.
2+
3+
## Usage
4+
5+
#### Client
6+
```php
7+
<?php
8+
9+
require_once __DIR__ . '/vendor/autoload.php';
10+
11+
$client = new \RxWebsocket\Client("ws://127.0.0.1:9191/");
12+
13+
$client->subscribe(new \Rx\Observer\CallbackObserver(
14+
function (\RxWebsocket\MessageSubject $ms) {
15+
$ms->subscribe(new \Rx\Observer\CallbackObserver(
16+
function ($message) {
17+
echo $message . "\n";
18+
}
19+
));
20+
21+
$sayHello = function () use ($ms) { $ms->onNext("Hello"); };
22+
23+
$sayHello();
24+
\EventLoop\addPeriodicTimer(5, $sayHello);
25+
},
26+
function ($error) {
27+
// connection errors here
28+
},
29+
function () {
30+
// stopped trying to connect here
31+
}
32+
));
33+
```
34+
35+
#### An Echo Server
36+
```php
37+
<?php
38+
39+
require_once __DIR__ . "/vendor/autoload.php";
40+
41+
$server = new \RxWebsocket\Server("127.0.0.1", 9191);
42+
43+
$server
44+
->subscribe(new \Rx\Observer\CallbackObserver(
45+
function (\RxWebsocket\MessageSubject $cs) {
46+
$cs->subscribe($cs);
47+
}
48+
));
49+
```
50+
51+
#### Server that dumps everything to the console
52+
```php
53+
<?php
54+
55+
require_once __DIR__ . "/vendor/autoload.php";
56+
57+
$server = new \RxWebsocket\Server("127.0.0.1", 9191);
58+
59+
$server
60+
->subscribe(new \Rx\Observer\CallbackObserver(
61+
function (\RxWebsocket\MessageSubject $cs) {
62+
$ms->subscribe(new CallbackObserver(
63+
function ($message) {
64+
echo $message;
65+
}
66+
));
67+
}
68+
));
69+
```
70+
71+
## Installation
72+
73+
Using [composer](https://getcomposer.org/):
74+
75+
Right now, this project uses components of the [Ratchet RFC6455 project](https://github.com/ratchetphp/RFC6455) that
76+
are not tagged with a release, so they must be installed manually before this project:
77+
```composer require ratchet/rfc6455:dev-psr```
78+
79+
Then install this project:
80+
```composer require voryx/rxwebsocket```

composer.json

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{
2+
"name": "voryx/rxwebsocket",
3+
"type": "library",
4+
"description": "Async Websockets for PHP using Rx",
5+
"keywords": [
6+
"websocket",
7+
"websockets",
8+
"rfc6455",
9+
"rx",
10+
"rx.php",
11+
"rxphp",
12+
"react",
13+
"reactive"
14+
],
15+
"license": "MIT",
16+
"authors": [
17+
{
18+
"name": "Matt Bonneau", "email": "[email protected]", "role": "Developer"
19+
},
20+
21+
{
22+
"name": "David Dan", "email": "[email protected]", "role": "Developer"
23+
}
24+
],
25+
"require": {
26+
"react/http": "dev-master",
27+
"react/http-client": "dev-master",
28+
"ratchet/rfc6455": "dev-psr7",
29+
"asm89/rx.php": "dev-master",
30+
"voryx/event-loop": "^0.2.0"
31+
},
32+
"autoload": {
33+
"psr-4": {
34+
"Voryx\\RxWebsocket\\": "src/RxWebsocket/"
35+
}
36+
}
37+
}

src/RxWebsocket/Client.php

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
<?php
2+
3+
namespace Voryx\RxWebsocket;
4+
5+
use Exception;
6+
use Ratchet\RFC6455\Handshake\ClientNegotiator;
7+
use React\Dns\Resolver\Factory;
8+
use React\HttpClient\Request;
9+
use React\HttpClient\Response;
10+
use Rx\Disposable\CallbackDisposable;
11+
use Rx\Observable\AnonymousObservable;
12+
use Rx\Observer\CallbackObserver;
13+
use Rx\ObserverInterface;
14+
use Rx\Subject\Subject;
15+
16+
class Client extends Subject
17+
{
18+
protected $url;
19+
20+
/**
21+
* RxWebsocket constructor.
22+
* @param $url
23+
*/
24+
public function __construct($url)
25+
{
26+
$this->url = $url;
27+
}
28+
29+
private function startConnection()
30+
{
31+
$loop = \EventLoop\getLoop();
32+
33+
$dnsResolverFactory = new Factory();
34+
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
35+
36+
$factory = new \React\HttpClient\Factory();
37+
$client = $factory->create($loop, $dnsResolver);
38+
39+
$cNegotiator = new ClientNegotiator($this->url);
40+
41+
$headers = $cNegotiator->getRequest()->getHeaders();
42+
43+
$flatHeaders = [];
44+
foreach ($headers as $k => $v) {
45+
$flatHeaders[$k] = $v[0];
46+
}
47+
48+
$request = $client->request("GET", $this->url, $flatHeaders, '1.1');
49+
50+
$request->on('response', function (Response $response, Request $request) use ($cNegotiator) {
51+
if ($response->getCode() !== 101) {
52+
throw new \Exception("Unexpected response code " . $response->getCode());
53+
}
54+
// TODO: Should validate response
55+
//$cNegotiator->validateResponse($response);
56+
57+
parent::onNext(new MessageSubject(
58+
new AnonymousObservable(function (ObserverInterface $observer) use ($response) {
59+
$response->on('data', function ($data) use ($observer) {
60+
$observer->onNext($data);
61+
});
62+
63+
$response->on('error', function ($e) use ($observer) {
64+
$observer->onError($e);
65+
});
66+
67+
$response->on('close', function () use ($observer) {
68+
$observer->onCompleted();
69+
});
70+
71+
$response->on('end', function () use ($observer) {
72+
$observer->onCompleted();
73+
74+
// complete the parent observer - we only do 1 connection
75+
parent::onCompleted();
76+
});
77+
78+
79+
return new CallbackDisposable(function () use ($response) {
80+
// commented this out because disposal was causing the other
81+
// end (the request) to close also - which causes the pending messages
82+
// to get tossed
83+
//$response->close();
84+
});
85+
}),
86+
new CallbackObserver(
87+
function ($x) use ($request) {
88+
$request->write($x);
89+
},
90+
function ($e) use ($request) {
91+
$request->close();
92+
},
93+
function () use ($request) {
94+
$request->end();
95+
}
96+
),
97+
true
98+
));
99+
});
100+
101+
$request->writeHead();
102+
}
103+
104+
public function subscribe(ObserverInterface $observer, $scheduler = null)
105+
{
106+
if (!$this->isStopped) {
107+
$this->startConnection();
108+
}
109+
110+
return parent::subscribe($observer, $scheduler);
111+
}
112+
113+
public function send($value)
114+
{
115+
$this->onNext($value);
116+
}
117+
118+
// Not sure we need this object to be a subject - just being an observer should be good enough I think
119+
public function onNext($value)
120+
{
121+
122+
}
123+
124+
public function onError(Exception $exception)
125+
{
126+
127+
}
128+
129+
public function onCompleted()
130+
{
131+
132+
}
133+
}

0 commit comments

Comments
 (0)