Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified lib/log4Net/log4net.dll
Binary file not shown.
Binary file added lib/log4Net/log4net.pdb
Binary file not shown.
12,626 changes: 8,071 additions & 4,555 deletions lib/log4Net/log4net.xml

Large diffs are not rendered by default.

Binary file modified lib/zookeeper/ZooKeeperNet.dll
Binary file not shown.
Binary file modified lib/zookeeper/ZooKeeperNet.pdb
Binary file not shown.
3 changes: 2 additions & 1 deletion src/Kafka/Kafka.Client/Api/TopicMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public static PartitionMetadata ReadFrom(ByteBuffer buffer, Dictionary<int, Brok
buffer, "error code", Tuple.Create<short, short>(-1, short.MaxValue));
var partitionId = ApiUtils.ReadIntInRange(buffer, "partition id", Tuple.Create(0, int.MaxValue)); // partition id
var leaderId = buffer.GetInt();
var leader = brokers[leaderId];
Broker leader;
brokers.TryGetValue(leaderId, out leader);

// list of all replicas
var numReplicas = ApiUtils.ReadIntInRange(buffer, "number of all replicas", Tuple.Create(0, int.MaxValue));
Expand Down
21 changes: 12 additions & 9 deletions src/Kafka/Kafka.Client/Common/Imported/ByteBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
namespace Kafka.Client.Common.Imported
{
using System;
using System.IO;
using System.Net;
using System.Text;

public class ByteBuffer : Stream
using log4net;
using System;
using System.IO;
using System.Net;
using System.Reflection;
using System.Text;

public class ByteBuffer : Stream
{

#region Buffer
protected static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

#region Buffer

private int mark = -1;
private int mark = -1;

private int position;

Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Kafka.Client/Consumers/TopicEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

public interface ITopicEventHandler<T>
{
void HandleTopicEvent(List<T> allTopics);
void HandleTopicEvent(IEnumerable<T> allTopics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public ZKRebalancerListener(
this.watcherExecutorThread.Start();
}

public void HandleChildChange(string parentPath, IList<string> curChilds)
public void HandleChildChange(string parentPath, IEnumerable<string> curChilds)
{
this.RebalanceEventTriggered();
}
Expand Down Expand Up @@ -1056,7 +1056,7 @@ internal WildcardStreamsHandler(

private readonly ZKGroupDirs dirs;

public void HandleTopicEvent(List<string> allTopics)
public void HandleTopicEvent(IEnumerable<string> allTopics)
{
Logger.Debug("Handling topic event");
var updatedTopics = allTopics.Where(topicFilter.IsTopicAllowed).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ZkTopicEventListener(ZookeeperTopicEventWatcher parent)
this.parent = parent;
}

public void HandleChildChange(string parentPath, IList<string> currentChilds)
public void HandleChildChange(string parentPath, IEnumerable<string> currentChilds)
{
lock (this.parent.@lock)
{
Expand Down
Binary file not shown.
Binary file not shown.
12 changes: 7 additions & 5 deletions src/Kafka/Kafka.Client/Kafka.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="Crc32C.NET">
<HintPath>..\..\..\lib\Crc32C\Crc32C.NET.dll</HintPath>
<HintPath>..\..\..\..\..\..\Tfs\Projects\M\MessageFromKafka\packages\Crc32C.NET.1.0.5.0\lib\net20\Crc32C.NET.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="log4net">
<HintPath>..\..\..\lib\log4Net\log4net.dll</HintPath>
Expand All @@ -112,7 +113,8 @@
<HintPath>..\packages\Newtonsoft.Json.6.0.2\lib\net40\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Snappy.NET">
<HintPath>..\..\..\lib\Snappy.NET\Snappy.NET.dll</HintPath>
<HintPath>..\..\..\..\..\..\Tfs\Projects\M\MessageFromKafka\packages\Snappy.NET.1.1.1.7\lib\net20\Snappy.NET.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Spring.Threading">
<HintPath>..\..\..\lib\Spring.Threading\Spring.Threading.dll</HintPath>
Expand All @@ -124,8 +126,9 @@
<Reference Include="System.Web.Extensions" />
<Reference Include="System.XML" />
<Reference Include="System.Xml.Linq" />
<Reference Include="ZooKeeperNet">
<HintPath>..\..\..\lib\zookeeper\ZooKeeperNet.dll</HintPath>
<Reference Include="ZooKeeperNet, Version=3.3.4.8, Culture=neutral, PublicKeyToken=fefd2c046da35b56, processorArchitecture=MSIL">
<HintPath>..\packages\ZooKeeperNet.3.3.4.8\lib\net40\ZooKeeperNet.dll</HintPath>
<Private>True</Private>
</Reference>
</ItemGroup>
<ItemGroup>
Expand Down Expand Up @@ -288,7 +291,6 @@
<None Include="KafkaClient.snk" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
Expand Down
18 changes: 18 additions & 0 deletions src/Kafka/Kafka.Client/Kafka.Client.nuspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Kafka.Client</id>
<version>0.7.0.4</version>
<title>Kafka .NET</title>
<authors>martijnv</authors>
<owners>martijnv</owners>
<projectUrl>https://github.com/orthrus/kafka-net</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a .NET implementation of a client for Kafka using C# for Kafka 0.8. It provides for an implementation that covers most basic functionalities to include a simple Producer and Consumer.</description>
<copyright>Copyright 2016</copyright>
<tags>apache kafka</tags>
<dependencies>
<dependency id="ZooKeeperNet" version="3.3" />
</dependencies>
</metadata>
</package>
6 changes: 6 additions & 0 deletions src/Kafka/Kafka.Client/Network/Transmission.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ internal abstract class Receive : Transmission
public int ReadCompletely(Stream channel)
{
var totalRead = 0;
var iter = 0;
while (!this.complete)
{
var read = this.ReadFrom(channel);
Logger.DebugFormat("{0} bytes read", read);

totalRead += read;
iter++;
if(iter > 1 && totalRead == 0)
{
throw new InvalidRequestException("Reading 0 bytes");
}
}

return totalRead;
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
[assembly: AssemblyCopyright("Copyright © ExactTarget 2011")]

[assembly: ComVisible(false)]
[assembly: AssemblyVersion("0.7.0.3")]
[assembly: AssemblyFileVersion("0.7.0.3")]
[assembly: AssemblyVersion("0.7.0.4")]
[assembly: AssemblyFileVersion("0.7.0.4")]
[assembly: InternalsVisibleTo("Kafka.Tests")]
[assembly: InternalsVisibleTo("Kafka.Console")]
[assembly: CLSCompliant(false)]
Expand Down
Loading