Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
Renamed classes, added support for setting socket options (makes it m…
Browse files Browse the repository at this point in the history
…ore flexible), and added interfaces for all major components (easier testing)
  • Loading branch information
kzu committed Mar 13, 2013
1 parent c48903e commit 6e4eeae
Show file tree
Hide file tree
Showing 24 changed files with 812 additions and 200 deletions.
38 changes: 38 additions & 0 deletions ReactiveSockets-rc.nuspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Need this additional manifest because having a dependency on an "rc" release means your
release also needs to be "rc". For .NET 4.5 we're release, not pre-release -->
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>ReactiveSockets</id>
<version>0.1.0-rc</version>
<title>Reactive Sockets</title>
<summary>The easiest way to do socket programming in .NET, leveraging simple Rx queries to implement your protocols.</summary>
<description>
Implementing socket-based prototols in .NET has never been easier. Example:
from header in socket.Receiver.Buffer(4)
let length = BitConverter.ToInt32(header.ToArray(), 0)
let body = socket.Receiver.Take(length)
select Encoding.UTF8.GetString(body.ToEnumerable().ToArray())
</description>
<authors>Daniel Cazzulino, kzu, Clarius</authors>
<language>en-US</language>
<projectUrl>https://github.com/clariuslabs/reactivesockets</projectUrl>
<licenseUrl>http://opensource.org/licenses/BSD-2-Clause</licenseUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<iconUrl>https://github.com/clariuslabs/adapter/raw/master/Common/ClariusLabsIcon.png</iconUrl>
<tags>reactive socket</tags>
<dependencies>
<group targetFramework="net40">
<dependency id="Microsoft.Bcl.Async" version="1.0.14-rc" />
<dependency id="Rx-Main" version="2.1.30214.0" />
</group>
<group targetFramework="net45">
<dependency id="Rx-Main" version="2.1.30214.0" />
</group>
</dependencies>
<frameworkAssemblies>
<frameworkAssembly assemblyName="System.Net" targetFramework="net40" />
<frameworkAssembly assemblyName="System.Net" targetFramework="net45" />
</frameworkAssemblies>
</metadata>
</package>
8 changes: 4 additions & 4 deletions ReactiveSockets.Tests/EndToEnd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void when_connected_then_can_exchange_fixed_length_messages()
var serverReceives = new List<string>();
var clientReceives = new List<string>();

var server = new TcpServer(1055);
var server = new ReactiveListener(1055);
server.Start();

Func<IObservable<byte>, IObservable<string>> parse =
Expand Down Expand Up @@ -77,7 +77,7 @@ public void when_connected_then_can_exchange_fixed_length_messages()
socket.SendAsync(convert("Welcome!")).Wait();
});

var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);
Console.WriteLine("Client socket created: {0}", client.GetHashCode());

client.ConnectAsync().Wait();
Expand Down Expand Up @@ -108,7 +108,7 @@ public void when_client_reconnects_then_can_exchange_fixed_length_messages()
var clientReceives = new List<string>();
var messageLength = 32;

var server = new TcpServer(1055);
var server = new ReactiveListener(1055);
server.Start();

Func<IObservable<byte>, IObservable<string>> parse =
Expand All @@ -132,7 +132,7 @@ public void when_client_reconnects_then_can_exchange_fixed_length_messages()
socket.SendAsync(convert("Welcome!")).Wait();
});

var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);
Console.WriteLine("Client socket created: {0}", client.GetHashCode());

client.ConnectAsync().Wait();
Expand Down
2 changes: 1 addition & 1 deletion ReactiveSockets.Tests/SampleProtocolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class SampleProtocolTests
public void when_parsing_bytes_then_raises_messages()
{
var bytes = new BlockingCollection<byte>();
var socket = Mock.Of<ISocket>(x => x.Receiver == bytes.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default));
var socket = Mock.Of<IReactiveSocket>(x => x.Receiver == bytes.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default));

var protocol = new ProtocolClient(socket);
var message = "";
Expand Down
14 changes: 7 additions & 7 deletions ReactiveSockets.Tests/TcpClientSocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@ public class TcpClientSocketTests
[Fact]
public void when_client_created_then_it_is_disconnected()
{
var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);

Assert.False(client.IsConnected);
}

[Fact]
public void when_disconnecting_disconnected_then_throws()
{
var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);

Assert.Throws<InvalidOperationException>(() => client.Disconnect());
}

[Fact]
public void when_connecting_then_raises_connected()
{
using (var server = new TcpServer(1055))
using (var server = new ReactiveListener(1055))
{
var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);
var connected = false;
client.Connected += (sender, args) => connected = true;

Expand All @@ -42,9 +42,9 @@ public void when_connecting_then_raises_connected()
[Fact]
public void when_disconnecting_then_raises_disconnected()
{
using (var server = new TcpServer(1055))
using (var server = new ReactiveListener(1055))
{
var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);
server.Start();
client.ConnectAsync().Wait();

Expand All @@ -67,7 +67,7 @@ public void when_reconnecting_then_raises_connected()
var server = Process.Start(@".\..\..\..\Sample\ReactiveServer\bin\Debug\ReactiveServer.exe");
try
{
var client = new TcpClientSocket("127.0.0.1", 1055);
var client = new ReactiveClient("127.0.0.1", 1055);
client.ConnectAsync().Wait();
Assert.True(client.IsConnected);

Expand Down
31 changes: 31 additions & 0 deletions ReactiveSockets.nuspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="utf-8"?>
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>ReactiveSockets</id>
<version>0.1.0</version>
<title>Reactive Sockets</title>
<summary>The easiest way to do socket programming in .NET, leveraging simple Rx queries to implement your protocols.</summary>
<description>
Implementing socket-based prototols in .NET has never been easier. Example:
from header in socket.Receiver.Buffer(4)
let length = BitConverter.ToInt32(header.ToArray(), 0)
let body = socket.Receiver.Take(length)
select Encoding.UTF8.GetString(body.ToEnumerable().ToArray())
</description>
<authors>Daniel Cazzulino, kzu, Clarius</authors>
<language>en-US</language>
<projectUrl>https://github.com/clariuslabs/reactivesockets</projectUrl>
<licenseUrl>http://opensource.org/licenses/BSD-2-Clause</licenseUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<iconUrl>https://github.com/clariuslabs/adapter/raw/master/Common/ClariusLabsIcon.png</iconUrl>
<tags>reactive socket</tags>
<dependencies>
<group targetFramework="net45">
<dependency id="Rx-Main" version="2.1.30214.0" />
</group>
</dependencies>
<frameworkAssemblies>
<frameworkAssembly assemblyName="System.Net" targetFramework="net45" />
</frameworkAssemblies>
</metadata>
</package>
8 changes: 8 additions & 0 deletions ReactiveSockets.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{FA10C5
.nuget\NuGet.targets = .nuget\NuGet.targets
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{25C5F562-BB60-4B20-BAB3-972FFEDA4B26}"
ProjectSection(SolutionItems) = preProject
build.bat = build.bat
ReactiveSockets-rc.nuspec = ReactiveSockets-rc.nuspec
ReactiveSockets.nuspec = ReactiveSockets.nuspec
README.md = README.md
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
22 changes: 22 additions & 0 deletions ReactiveSockets/IReactiveClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ReactiveSockets
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Interface implemented by the reactive client socket that can
/// connect, send data to and receive data from a server.
/// </summary>
interface IReactiveClient : IReactiveSocket
{
/// <summary>
/// Attempts to connect to a server.
/// </summary>
Task ConnectAsync();

/// <summary>
/// Disconnects the underlying connection to the server.
/// </summary>
void Disconnect();
}
}
27 changes: 27 additions & 0 deletions ReactiveSockets/IReactiveListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace ReactiveSockets
{
using System;

/// <summary>
/// Interface implemented by the reactive listeners which can
/// accept incoming connections.
/// </summary>
public interface IReactiveListener
{
/// <summary>
/// Observable connections that are being accepted by the listener.
/// </summary>
IObservable<ReactiveSocket> Connections { get; }

/// <summary>
/// Disposes the listener, releasing all resources and closing
/// any active connections.
/// </summary>
void Dispose();

/// <summary>
/// Starts accepting connections.
/// </summary>
void Start();
}
}
55 changes: 55 additions & 0 deletions ReactiveSockets/IReactiveSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace ReactiveSockets
{
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Low level channel between client and server.
/// </summary>
public interface IReactiveSocket : ISocket, IDisposable
{
/// <summary>
/// Raised when the socket is connected.
/// </summary>
event EventHandler Connected;

/// <summary>
/// Raised when the socket is disconnected.
/// </summary>
event EventHandler Disconnected;

/// <summary>
/// Raised when the socket is disposed.
/// </summary>
event EventHandler Disposed;

/// <summary>
/// Gets whether the socket is connected.
/// </summary>
bool IsConnected { get; }

/// <summary>
/// Observable bytes that are being received by this endpoint.
/// </summary>
IObservable<byte> Receiver { get; }

/// <summary>
/// Observable bytes that are being sent through this endpoint
/// by using the <see cref="SendAsync(byte[])"/> or
/// <see cref="SendAsync(byte[], CancellationToken)"/> methods.
/// </summary>
IObservable<byte> Sender { get; }

/// <summary>
/// Sends data asynchronously through this endpoint.
/// </summary>
Task SendAsync(byte[] data);

/// <summary>
/// Sends data asynchronously through this endpoint, with support
/// for cancellation.
/// </summary>
Task SendAsync(byte[] bytes, CancellationToken cancellation);
}
}
29 changes: 24 additions & 5 deletions ReactiveSockets/ISocket.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
namespace ReactiveSockets
{
using System;
using System.Threading.Tasks;
using System.Net.Sockets;

/// <summary>
/// Exposes the core SetSocketOption method from .NET sockets.
/// </summary>
public interface ISocket
{
IObservable<byte> Receiver { get; }
IObservable<byte> Sender { get; }
Task SendAsync(byte[] data);
/// <summary>See <see cref="T:System.Net.Sockets.Socket.GetSocketOption(SocketOptionLevel, SocketOptionName)" />.</summary>
object GetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.GetSocketOption(SocketOptionLevel, SocketOptionName, byte[])" />.</summary>
void GetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, byte[] optionValue);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.GetSocketOption(SocketOptionLevel, SocketOptionName, int)" />.</summary>
byte[] GetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, int optionLength);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel, SocketOptionName, bool)" />.</summary>
void SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, bool optionValue);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel, SocketOptionName, byte[])" />.</summary>
void SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, byte[] optionValue);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel, SocketOptionName, int)" />.</summary>
void SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, int optionValue);

/// <summary>See <see cref="T:System.Net.Sockets.Socket.SetSocketOption(SocketOptionLevel, SocketOptionName, object)" />.</summary>
void SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, object optionValue);
}
}
Loading

0 comments on commit 6e4eeae

Please sign in to comment.