Skip to content

Commit 5abb499

Browse files
authored
Merge pull request #6 from skbkontur/avk/leaderElectionFix
Avk/leader election fix
2 parents fa7cbc9 + 86dbbac commit 5abb499

File tree

11 files changed

+35
-21
lines changed

11 files changed

+35
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# Changelog
2+
## v1.2.1 - 2021.03.05
3+
- Add cancelation token to the RtqPeriodicJobRunner.
24

35
## v1.1.11 - 2021.02.25
46
- Update `@skbkontur/react-ui` package

Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<PackageReference Include="Elasticsearch.Net" Version="6.6.0" />
1212
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
1313
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
14-
<PackageReference Include="SkbKontur.EventFeeds" Version="1.0.9" />
14+
<PackageReference Include="SkbKontur.EventFeeds" Version="1.1.1" />
1515
</ItemGroup>
1616

1717
<ItemGroup>

Cassandra.DistributedTaskQueue.Monitoring/EventFeed/RtqEventFeedPeriodicJobRunner.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
using System;
1+
using System;
22
using System.Linq;
33
using System.Net;
4+
using System.Threading;
45

56
using JetBrains.Annotations;
67

@@ -24,9 +25,10 @@ public RtqEventFeedPeriodicJobRunner(IRtqPeriodicJobRunner rtqPeriodicJobRunner,
2425

2526
public void RunPeriodicJobWithLeaderElection([NotNull] string jobName,
2627
TimeSpan delayBetweenIterations,
27-
[NotNull] Action jobAction,
28+
[NotNull] Action<CancellationToken> jobAction,
2829
[NotNull] Func<IRunningEventFeed> onTakeTheLead,
29-
[NotNull] Func<IRunningEventFeed> onLoseTheLead)
30+
[NotNull] Func<IRunningEventFeed> onLoseTheLead,
31+
CancellationToken cancellationToken)
3032
{
3133
rtqPeriodicJobRunner.RunPeriodicJobWithLeaderElection(jobName,
3234
delayBetweenIterations,
@@ -44,7 +46,8 @@ public void RunPeriodicJobWithLeaderElection([NotNull] string jobName,
4446
var runningEventFeed = onLoseTheLead();
4547
var lagReportingJobId = FormatLagReportingJobId(runningEventFeed.FeedKey);
4648
rtqPeriodicJobRunner.StopPeriodicJob(lagReportingJobId);
47-
});
49+
},
50+
cancellationToken);
4851
}
4952

5053
public void StopPeriodicJobWithLeaderElection([NotNull] string jobName)
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using JetBrains.Annotations;
1+
using System.Threading;
2+
3+
using JetBrains.Annotations;
24

35
using SkbKontur.EventFeeds;
46

@@ -8,6 +10,6 @@ namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer
810
public interface IRtqMonitoringEventFeeder
911
{
1012
[NotNull]
11-
IEventFeedsRunner RunEventFeeding();
13+
IEventFeedsRunner RunEventFeeding(CancellationToken cancellationToken);
1214
}
1315
}

Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqMonitoringEventFeeder.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using System;
2+
using System.Threading;
23

34
using JetBrains.Annotations;
45

@@ -41,7 +42,7 @@ public RtqMonitoringEventFeeder(ILog logger,
4142
public IGlobalTime GlobalTime { get; }
4243

4344
[NotNull]
44-
public IEventFeedsRunner RunEventFeeding()
45+
public IEventFeedsRunner RunEventFeeding(CancellationToken cancellationToken)
4546
{
4647
return eventFeedFactory
4748
.WithOffsetType<string>()
@@ -50,7 +51,7 @@ public IEventFeedsRunner RunEventFeeding()
5051
.WithBlade($"{indexerSettings.EventFeedKey}_Blade1", delay : TimeSpan.FromMinutes(15)))
5152
.WithOffsetInterpreter(offsetInterpreter)
5253
.WithOffsetStorageFactory(bladeId => new RtqElasticsearchOffsetStorage(elasticsearchClient, offsetInterpreter, bladeId.BladeKey))
53-
.RunFeeds(delayBetweenIterations : TimeSpan.FromMinutes(1));
54+
.RunFeeds(delayBetweenIterations : TimeSpan.FromMinutes(1), cancellationToken);
5455
}
5556

5657
private readonly ILog logger;
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using JetBrains.Annotations;
1+
using System.Threading;
2+
3+
using JetBrains.Annotations;
24

35
using SkbKontur.EventFeeds;
46

@@ -7,6 +9,6 @@ namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TaskCounter
79
[PublicAPI]
810
public interface IRtqTaskCounterEventFeeder
911
{
10-
( /*[NotNull]*/ IEventFeedsRunner, /*[NotNull]*/ RtqTaskCounterStateManager) RunEventFeeding();
12+
( /*[NotNull]*/ IEventFeedsRunner, /*[NotNull]*/ RtqTaskCounterStateManager) RunEventFeeding(CancellationToken cancellationToken);
1113
}
1214
}

Cassandra.DistributedTaskQueue.Monitoring/TaskCounter/RtqTaskCounterEventFeeder.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using GroBuf;
1+
using System.Threading;
2+
3+
using GroBuf;
24

35
using JetBrains.Annotations;
46

@@ -45,7 +47,7 @@ public RtqTaskCounterEventFeeder(ILog logger,
4547
[NotNull]
4648
public IGlobalTime GlobalTime { get; }
4749

48-
public ( /*[NotNull]*/ IEventFeedsRunner, /*[NotNull]*/ RtqTaskCounterStateManager) RunEventFeeding()
50+
public ( /*[NotNull]*/ IEventFeedsRunner, /*[NotNull]*/ RtqTaskCounterStateManager) RunEventFeeding(CancellationToken cancellationToken)
4951
{
5052
var stateManager = new RtqTaskCounterStateManager(logger, serializer, taskDataRegistry, stateStorage, settings, offsetInterpreter, perfGraphiteReporter);
5153
var eventConsumer = new RtqTaskCounterEventConsumer(stateManager, handleTasksMetaStorage, perfGraphiteReporter);
@@ -58,7 +60,7 @@ public RtqTaskCounterEventFeeder(ILog logger,
5860
.WithOffsetInterpreter(offsetInterpreter)
5961
.WithOffsetStorageFactory(bladeId => stateManager.CreateOffsetStorage(bladeId))
6062
.WithSingleLeaderElectionKey(stateManager.CompositeFeedKey)
61-
.RunFeeds(settings.DelayBetweenEventFeedingIterations);
63+
.RunFeeds(settings.DelayBetweenEventFeedingIterations, cancellationToken);
6264
return (eventFeedsRunner, stateManager);
6365
}
6466

Cassandra.DistributedTaskQueue/Cassandra.DistributedTaskQueue.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
<ItemGroup>
1111
<PackageReference Include="GroBuf" Version="1.5.5" />
12-
<PackageReference Include="SkbKontur.Cassandra.DistributedLock" Version="2.2.8" />
12+
<PackageReference Include="SkbKontur.Cassandra.DistributedLock" Version="2.2.12" />
1313
<PackageReference Include="SkbKontur.Cassandra.GlobalTimestamp" Version="1.0.8" />
1414
<PackageReference Include="SkbKontur.Cassandra.ThriftClient" Version="2.4.23" />
1515
<PackageReference Include="SkbKontur.Graphite.Client" Version="1.3.16" />

Cassandra.DistributedTaskQueue/Handling/IRtqPeriodicJobRunner.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using System;
2+
using System.Threading;
23

34
using JetBrains.Annotations;
45

@@ -15,9 +16,10 @@ void RunPeriodicJob([NotNull] string jobId,
1516

1617
void RunPeriodicJobWithLeaderElection([NotNull] string jobId,
1718
TimeSpan delayBetweenIterations,
18-
[NotNull] Action jobAction,
19+
[NotNull] Action<CancellationToken> jobAction,
1920
[NotNull] Action onTakeTheLead,
20-
[NotNull] Action onLoseTheLead);
21+
[NotNull] Action onLoseTheLead,
22+
CancellationToken cancellationToken);
2123

2224
void StopPeriodicJobWithLeaderElection([NotNull] string jobId);
2325
}

global.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"sdk": {
3-
"version": "5.0.100"
3+
"version": "5.0.102"
44
}
55
}

0 commit comments

Comments
 (0)