Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][ISSUE #4426] Optimize EventMesh Bridge with QUIC #4427

Open
wants to merge 4,307 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
4307 commits
Select commit Hold shift + click to select a range
782a127
[ISSUE #3931]Optimize Body extends class toString method (#3932)
mxsm May 25, 2023
dac28e3
Resolves Issue #4029
nitheesh-daram May 27, 2023
869cb49
[ISSUE #3973]log.error should be in try-catch() instead of log.warn (…
pandaapo May 29, 2023
4db7667
[ISSUE #3942]Update README.zh-CN.md (#3943)
mxsm May 29, 2023
2a8a573
[ISSUE #4044] Remove duplicate init connector operation in worker
xwm1992 May 29, 2023
a4daa97
Update README.md
xwm1992 May 29, 2023
5b09589
Merge branch 'apache:master' into ISSUE-4029
nitheesh-daram May 29, 2023
2fed6c7
[ISSUE #3955 ]Code optimization.[ConfigurationHandler] (#3972)
nitheesh-daram May 30, 2023
c4a4ff8
[ISSUE #3900] Removed Additional Null Check
Ishaan29 May 30, 2023
3092643
[ISSUE #3952]Optimize EventMeshServer#EventMeshServer method (#3953)
mxsm May 30, 2023
dc79036
[ISSUE #4038]Optimize eventmesh-openconnect log print (#4039)
mxsm May 30, 2023
a6a2793
Merge pull request #4037 from nitheesh-daram/ISSUE-4029
wqliang May 30, 2023
ae961bf
Merge pull request #4048 from apache/xwm1992-patch-5
wqliang May 30, 2023
c8d9df8
[ISSUE #3827] Changed Access Modifiers for MessageQueue items (#3987)
Ishaan29 May 30, 2023
207b9af
[ISSUE #4044] Remove duplicate init connector operation in worker (#4…
xwm1992 May 30, 2023
c5f3aba
[ISSUE #3963] Method invocation 'getBytes' may produce 'NullPointerEx…
ZahaanMahajan May 31, 2023
1e8c5db
[ISSUE #3944]Var of type SpanProcessor no need to be defined as globa…
pandaapo May 31, 2023
2a0f165
[ISSUE #3924]Add unit test for UrlMappingPattern, RequestMapping (#3925)
pandaapo May 31, 2023
6fbfd7a
[ISSUE #3909] Replaced anonymous new ChannelFutureListener() with lam…
PickBas May 31, 2023
fb144bb
[ISSUE #3918]RequestMapping, UrlMappingPattern can apply some optimizing
pandaapo May 31, 2023
bc9100e
Support more nacos configuration.
pandaapo Jun 2, 2023
10dd308
Assign default value to 'pollingThreadCount'
pandaapo Jun 2, 2023
0de0d04
Aoid NPE.
pandaapo Jun 2, 2023
8028e9f
Support more configurations for zk plugin.
pandaapo Jun 3, 2023
6373071
Set default RetryPolicy
pandaapo Jun 3, 2023
0113c42
[ISSUE #3933] Fix the tcp port. (#3936)
Alonexc Jun 6, 2023
e748f24
[ISSUE #3912]There are boilerplate codes in PrometheusTcpExporter, Pr…
pandaapo Jun 6, 2023
d374551
[ISSUE #3897]Admin, RocketMQAdmin, StandaloneAdmin and AbstractRmqAdm…
pandaapo Jun 6, 2023
c5bbd67
[ISSUE #3983] Optimize MessageQueue (#3984)
mxsm Jun 7, 2023
4b49642
[ISSUE #3978]Remove unnecessary annotations in TopicCreateRequest and…
pandaapo Jun 7, 2023
46fad30
[ISSUE #4050]Show logo in IDEA welcome screen (#4051)
pandaapo Jun 7, 2023
d182e55
[ISSUE #4031] Specifies the default character set (#4057)
glcas Jun 7, 2023
3f49cd4
[ISSUE #3485]Optimize Grpc protocol for Java (#3746)
mxsm Jun 12, 2023
307c925
add openfunction-connect
xwm1992 Jun 12, 2023
3f1efd6
Merge branch 'master' of https://github.com/apache/eventmesh into ope…
xwm1992 Jun 12, 2023
0164c58
[ISSUE #4079]The error of building metric observer: java.util.NoSuchE…
pandaapo Jun 12, 2023
255709b
Update .asf.yaml (#4122)
qqeasonchen Jun 13, 2023
9998d16
[ISSUE #4118]Add licenses to the Java classes generated from CloudEve…
mxsm Jun 13, 2023
cba698d
[ISSUE 4120] Fix the file read as null. (#4121)
Alonexc Jun 14, 2023
601d684
[ISSUE #4110] Enhance thread handling of InterruptedException (#4113)
Pil0tXia Jun 14, 2023
e46248e
[ISSUE #4111] Made an explicit null check on the map (#4112)
himansh295 Jun 14, 2023
7b18ca6
[ISSUE #3901] Method needlessly defines parameter with concrete class…
kyooosukedn Jun 14, 2023
40de773
[ISSUE #4017] Resources may not be released. (#4075)
kyooosukedn Jun 14, 2023
968a796
Merge branch 'master' of https://github.com/apache/eventmesh into ope…
xwm1992 Jun 14, 2023
f99790a
[ISSUE #4123] Add EventMesh OpenFunction Source/Sink Connector
xwm1992 Jun 16, 2023
d169e68
Merge pull request #4124 from xwm1992/openfunction-connector
MajorHe1 Jun 16, 2023
519941a
[ISSUE #4125] rename eventmesh-connector-openfunction module
xwm1992 Jun 16, 2023
41adba3
[Enhancement] Anonymous new can be replaced with lambda.[AsyncPubClient]
HattoriHenzo Jun 16, 2023
833b586
[ISSUE #4128] Add eventmesh-connector-rocketmq module
xwm1992 Jun 17, 2023
c3a61ba
Add licence header to EventMeshGrpcService
HattoriHenzo Jun 19, 2023
8843df2
Merge pull request #4126 from xwm1992/openfunction-connector
qqeasonchen Jun 20, 2023
e4ec6fd
[ISSUE #4141] Fix the GRPC subscribe error (#4143)
1404369980 Jun 25, 2023
47bc480
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jun 25, 2023
19ed469
[ISSUE #4146] fix some code to pass checkstyle (#4147)
TheR1sing3un Jun 25, 2023
0d10698
[ISSUE #4128] Add eventmesh-connector-rocketmq module (#4148)
xwm1992 Jun 25, 2023
da76069
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jun 25, 2023
03bdb40
fix compile error
xwm1992 Jun 25, 2023
f807cc3
fix compile error
xwm1992 Jun 25, 2023
257a786
Merge pull request #4151 from xwm1992/openfunction-connector
wqliang Jun 25, 2023
656c053
Update README.md (#4152)
qqeasonchen Jun 25, 2023
991cbda
Update greetings.yml (#4153)
qqeasonchen Jun 25, 2023
140bebd
[ISSUE #4133]Optimize OpenFunctionSourceConnector poll
mxsm Jun 18, 2023
b85d890
[ISSUE #4069]Add TLSConfig to registry plugin Consul.
pandaapo Jun 27, 2023
006a36d
Merge pull request #4134 from mxsm/eventmesh-4133
wqliang Jun 27, 2023
6cbb2d0
[ISSUE #4107] made fields final (#4158)
dipankr Jun 27, 2023
12aeb4b
[ISSUE #4109 ] Code Optimization.[ClientGroupWrapper] (#4154)
nitheesh-daram Jun 27, 2023
35c1eb4
[ISSUE #4108] Field may be 'final'[EventMeshRebalanceService] (#4149)
sun-tao Jun 27, 2023
1600c6b
[ISSUE #4144] Subscription is almost impossible to be cancelled when …
pandaapo Jun 28, 2023
1833238
Fix conflicts
pandaapo Jun 28, 2023
0fb795b
[ISSUE #4102] Field may be 'final'[UpStreamMsgContext]
kuldeepsidhu88 Jun 29, 2023
9a436c7
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jun 29, 2023
e789fc1
[MINOR] Fix constants and link. (#4163)
Alonexc Jun 29, 2023
2258c21
[ISSUE #4161]Fix JsonUtils not support serialize java.time.LocalDate …
mxsm Jun 29, 2023
4e5428b
[ISSUE #4138] Add Unit Test for UrlMappingPattern (#4139)
Pil0tXia Jun 29, 2023
fdfc24b
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jun 29, 2023
525a8dd
[ISSUE #4164]Fix EventMeshGrpcConsumer client subscribe topic error (…
mxsm Jun 30, 2023
bae1307
[ISSUE #4166] Fix grpc AsyncPublishInstance has no push messages. (#4…
Alonexc Jun 30, 2023
b6fdbbd
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jun 30, 2023
3392b45
merge 1.9.0 prepare branch to master for release (#4181)
xwm1992 Jul 5, 2023
01c8524
Merge branch 'master' of https://github.com/apache/eventmesh
xwm1992 Jul 5, 2023
ce21c7c
Merge branch 'master' of https://github.com/apache/eventmesh into eve…
HattoriHenzo Jul 6, 2023
092736f
Minor refactoring
HattoriHenzo Jul 7, 2023
fb27010
[ISSUE#4198]Do some code optimization.[GrpcRetryer] (#4205)
harshithasudhakar Jul 10, 2023
1497fd3
[ISSUE #4185] Correct eventmesh.properties comment grammar mistake (#…
Pil0tXia Jul 10, 2023
b42f7b4
[ISSUE # 4137]Replace printStackTrace with error logging in RocketMQS…
Ruhshan Jul 10, 2023
42db065
[ISSUE #4101]Correct sequence for expected and actual arguments in as…
Ruhshan Jul 10, 2023
abb2271
Fix the port occupation that caused the startup fail, and the process…
lrhkobe Jul 11, 2023
8fcde9c
Fix concurrent modification exception
lrhkobe Jul 11, 2023
2a0fb98
Set http request timeout
lrhkobe Jul 11, 2023
fc3a09a
Authentication supports version number verification
lrhkobe Jul 11, 2023
5049542
log optimization
lrhkobe Jul 11, 2023
ec70e9b
fix NPE, security upgrade
lrhkobe Jul 11, 2023
9ad1261
Optimize remote subscribe and unsubscribe interface
lrhkobe Jul 11, 2023
16acead
Default parameter optimization and codec optimization
lrhkobe Jul 11, 2023
c5be10b
support application/protobuf
lrhkobe Jul 11, 2023
9471624
fix checkstyle
lrhkobe Jul 12, 2023
e4c9775
Add response status code setting for http request
lrhkobe Jul 12, 2023
d95a2db
fix license check problem
lrhkobe Jul 13, 2023
81690ec
fix checkstyle problem
lrhkobe Jul 13, 2023
1bd7e7c
Merge pull request #4221 from lrhkobe/1.5.1-improve
wqliang Jul 13, 2023
3f6576e
[ISSUE #4105] Code Optimization in EventMeshTcpRetryer (#4173)
VishalMCF Jul 14, 2023
06ae1b8
[ISSUE #3904] Make fields final that should be final (#4176)
Ruhshan Jul 14, 2023
c82454b
[ISSUE #4208] Add JavaDoc for org.apache.eventmesh.runtime.admin APIs…
Pil0tXia Jul 14, 2023
d8b99af
[ISSUE #4182] Add JavaDoc and Comments for eventmesh-admin-rocketmq M…
Pil0tXia Jul 14, 2023
f2bef69
[ISSUE #3836] Add proper access modifers for HttpEventWrapper (#4177)
Ruhshan Jul 14, 2023
d9b2d44
add license for eventmesh-sdk-go
xwm1992 Jul 14, 2023
8946412
modify settings.gradle
xwm1992 Jul 14, 2023
d9f1087
[ISSUE #4197] Use switch instead of if else
kartiktayal Jul 14, 2023
2516df4
[ISSUE #4215] Correct typos and Remove redundant keyword for AdminWeb…
Pil0tXia Jul 14, 2023
27f0caa
[ISSUE#4213]Resolve the compilation warning for the lombok annotation…
mxsm Jul 14, 2023
c5357d6
Merge pull request #4224 from xwm1992/license-modify
wqliang Jul 14, 2023
e287e76
[ISSUE #4222]support metadata management in http protocol (#4231)
lrhkobe Jul 17, 2023
7a20f92
[ISSUE #4084]Method prints the stack trace to the console[EventMeshMe…
harshithasudhakar Jul 18, 2023
651fe9b
[ISSUE 4209] Fix event.getExtension(ProtocolKey.ClientInstanceKey.TOK…
Alonexc Jul 18, 2023
ebe4d64
[ISSUE #3511] Swaped 2 arguments so they are in the correct order: ex…
sumitdethe27 Jul 18, 2023
cc43dee
[ISSUE #4227] Use switch...case... instead of if...else... for HookCo…
Pil0tXia Jul 18, 2023
a62333b
[ISSUE #4022]contains() is a legacy method that is equivalent to cont…
harshithasudhakar Jul 20, 2023
09094b8
[ISSUE #4018] Code optimization.[ProducerManager]
harshithasudhakar Jul 20, 2023
50065fb
[ISSUE #4200]Do some code optimization.[ServiceUtils]
VL-037 Jul 20, 2023
d14a604
[ISSUE #4241]Optimize BatchSendMessageProcessor log print and calcula…
mxsm Jul 20, 2023
8ad08ba
[ISSUE #4194]Dereference of 'heartbeatItems' may produce NPE. (#4235)
Codeprh Jul 20, 2023
1ba925b
[ISSUE #4236] TCP reconnection failed
Jul 20, 2023
14bf5c2
[ISSUE #4199] Catch Exception instead of Throwable (#4234)
Codeprh Jul 20, 2023
4b994b0
[ISSUE #4021] Replace LinkedList by ArrayList (#4233)
VL-037 Jul 20, 2023
4119ada
[ISSUE #4202] Do some code optimization.[util] (#4230)
fabian4 Jul 20, 2023
7c2c554
Merge pull request #4066 from pandaapo/master-issue4065
mxsm Jul 21, 2023
6c94d9d
[ISSUE #4011] add kafkaConnector module (#4180)
epiao55 Jul 21, 2023
a0ac44c
Update README.md
xwm1992 Jul 21, 2023
d3a48c8
add offsetStorage model
xwm1992 Jul 21, 2023
f46ad15
Merge pull request #4127 from HattoriHenzo/eventmesh-4086
mxsm Jul 21, 2023
022c051
[ISSUE #4245] Fix start eventmesh failed.
Alonexc Jul 21, 2023
d14acd2
update offsetStorage model
xwm1992 Jul 21, 2023
3499497
[ISSUE #4236] TCP reconnection failed
mxsm Jul 21, 2023
e83f74c
Merge pull request #4259 from apache/xwm1992-patch-6
pandaapo Jul 22, 2023
3ad12c4
[ISSUE #4097]: Remove duplicate keys from kafka-client properties in …
Ruhshan Jul 22, 2023
2a3c680
[ISSUE #4023]Comparison using reference equality instead of value equ…
kyooosukedn Jul 24, 2023
2cb5aa9
[ISSUE #4269] Used switch to replace the if-else. [MeshMessageProtoco…
devCod3r Jul 25, 2023
984eacc
[ISSUE #4099]Refactor re-used constant's usage from dedicated file (#…
Ruhshan Jul 25, 2023
fe13a85
[ISSUE #4267] Optimized [GrpcEventMeshCloudEventProtocolResolver] (#4…
devCod3r Jul 25, 2023
3372112
[ISSUE #4252] Minor refactoring for AssertUtils class (#4253)
HattoriHenzo Jul 25, 2023
c18ef05
[ISSUE #4092] Replaced isEqualto() by hasSize() method (#4251)
sumitdethe27 Jul 25, 2023
9e99067
feat: add event-bridge example
seriouszyx Jul 26, 2023
4369b94
[ISSUE #4288] Enable the nacos plugin to support namespace (#4293)
1404369980 Jul 26, 2023
81585ff
update architecture image (#4294)
xwm1992 Jul 27, 2023
81eacbd
[ISSUE #4211] Add JavaDoc for eventmesh.runtime.admin APIs (not in da…
Pil0tXia Jul 27, 2023
6b8e5e6
[ISSUE #4286] Fix response body is not recognised as JSON in query We…
Pil0tXia Jul 27, 2023
6df8428
[Feature #4032] Add Redis Connector modules (#4280)
fabian4 Jul 27, 2023
e5915f7
[ISSUE #4204] Repeat the code extraction as a method. (#4229)
fabian4 Jul 27, 2023
3652000
[ISSUE #4243] Optimize Webhook Manufacturer source Hard-coding (#4273)
Pil0tXia Jul 27, 2023
4e8505b
fix: parse the correct object (#4291)
Pil0tXia Jul 27, 2023
77839f6
[ISSUE #4268] Used switch to replace the if-else [CloudEventsProtocol…
devCod3r Jul 27, 2023
5b8616b
[ISSUE #4261] Some code optimizations to the EventMeshCloudEventBuild…
hhuang1231 Jul 28, 2023
a7b24ea
[ISSUE 4239] Optimal http header hard-code (#4295)
Alonexc Jul 30, 2023
af61f73
Update bug_report.yml (#4308)
qqeasonchen Jul 31, 2023
b4f2b69
Update documentation_related.yml (#4307)
qqeasonchen Jul 31, 2023
8fa63b6
Update feature_request.yml (#4306)
qqeasonchen Jul 31, 2023
77b87f9
Update enhancement_request.yml (#4305)
qqeasonchen Jul 31, 2023
94dbdaa
[ISSUE 4309] Fix http process error (#4310)
Alonexc Aug 1, 2023
a48410c
Update documentation_related.yml (#4311)
qqeasonchen Aug 1, 2023
d19baa5
Merge branch 'apache:master' into developmentEM
devCod3r Aug 1, 2023
7fd6e3d
Update enhancement_request.yml (#4312)
qqeasonchen Aug 1, 2023
3592263
Update feature_request.yml (#4313)
qqeasonchen Aug 1, 2023
dd40227
[ISSUE #4171]support pulsar connector (#4186)
g0715158 Aug 1, 2023
dff6e7a
[ISSUE #4263] InterruptedExceptions should never be ignored[ThreadUti…
harshithasudhakar Aug 2, 2023
289bac6
[ISSUE #4266]InterruptedExceptions should never be ignored[connectors…
Ruhshan Aug 2, 2023
99bcf67
[ISSUE #4093]Repeat code extraction as method.[MessageUtils] (#4315)
Ruhshan Aug 2, 2023
4121332
modify the offset management service
xwm1992 Aug 2, 2023
fef56a5
[ISSUE #4260] Anonymous new ChannelFutureListener() can be replaced w…
sbmvirdi Aug 2, 2023
19dd2b1
[Feature #4032] Fix redis codec (#4301)
fabian4 Aug 3, 2023
939db45
modify the nacos config management service
xwm1992 Aug 4, 2023
8ab4000
modify the configs
xwm1992 Aug 4, 2023
800b628
[ISSUE #4264]Some optimizations for ProducerService (#4334)
847850277 Aug 5, 2023
4a12417
Merge branch 'master' of https://github.com/apache/eventmesh into add…
xwm1992 Aug 7, 2023
4f612f1
fix ci check error
xwm1992 Aug 7, 2023
4eddf36
fix ci check error
xwm1992 Aug 7, 2023
9621900
fix ci check error
xwm1992 Aug 7, 2023
9d67039
[ISSUE #4085] Switching order (#4338)
Kouzola Aug 7, 2023
68c8f23
[ISSUE #4262] Enhancement Request EventMeshCloudEventUtils (#4337)
maxim-zgardan Aug 8, 2023
50bbaec
Merge pull request #4329 from xwm1992/add-offsetStorage
wqliang Aug 8, 2023
97157f8
fix template not taking effect (#4325)
Alonexc Aug 8, 2023
8732dd3
Merge pull request #4298 from devCod3r/developmentEM
lrhkobe Aug 8, 2023
4568c50
Merge pull request #4284 from seriouszyx/yixiang/event-bridge
lrhkobe Aug 8, 2023
d3d9d4f
[ISSUE #4339][Unit Test] improve eventmesh-common protocol unit test …
847850277 Aug 10, 2023
475cb2c
feat: refactor with mongodb plugin.
fabian4 Aug 10, 2023
dec34f3
[ISSUE 4346] Fix start error and some code optimization. (#4347)
Alonexc Aug 11, 2023
8418745
[ISSUE 4350] Fixed eventmesh startup error under MQ plugin (#4351)
Alonexc Aug 11, 2023
a350ffc
[ISSUE #4349][Unit Test] eventmesh-common header unit test. (#4348)
847850277 Aug 11, 2023
dbfb571
[ISSUE #4354] Fix spelling mistake (#4353)
iwangjie Aug 13, 2023
85ee0fb
[ISSUE #4191] Do some code optimization.[StreamPushRequest](#4356)
himansh295 Aug 14, 2023
8be4180
[ISSUE #4345]Fix publish EventMeshMessage without requestCode throw j…
mxsm Aug 14, 2023
857ad6b
[ISSUE #4190] Code optimization for the EventMeshConsumer class.
yanrongzhen Aug 14, 2023
9264a75
[ISSUE #4196] [Enhancement] Dereference of 'subscriptionItems' may pr…
vaibhavarya1622 Aug 14, 2023
5494919
[ISSUE #4335] Fix Webhook connection not closing after processing (#4…
Pil0tXia Aug 14, 2023
dd6cc06
[ISSUE #4203] [Enhancement] Do some code optimization.[EventMeshTcpMe…
vaibhavarya1622 Aug 15, 2023
c29ada6
[ISSUE #4082][Task1] Support S3 file source connector (#4132)
TheR1sing3un Aug 15, 2023
ad7d808
[ISSUE #4363] Do some code optimization.[EventMeshConsumer] (#4362)
yanrongzhen Aug 15, 2023
fe3000c
[ISSUE 4360] Fix unSubscribeUrl cannot be null (#4361)
Alonexc Aug 15, 2023
c271cee
feat: refactor with mongodb plugin.
fabian4 Aug 15, 2023
8d627ec
feat: refactor with mongodb plugin.
fabian4 Aug 16, 2023
f1a01a2
Merge branch 'apache:master' into mongodb_plugin
fabian4 Aug 16, 2023
3bbffb3
feat: refactor with mongodb plugin.
fabian4 Aug 16, 2023
d96804a
feat: refactor with mongodb plugin.
fabian4 Aug 17, 2023
324045c
Merge pull request #4367 from fabian4/mongodb_plugin
wqliang Aug 17, 2023
8f584d5
Merge pull request #4068 from pandaapo/master-issue4067
qqeasonchen Aug 17, 2023
9cf176a
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
8f13784
feat: refactor with rabbitmq plugin.
fabian4 Aug 17, 2023
182c509
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
e03d291
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
92eb957
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
08b1915
[ISSUE #4379] Enable manually commit offset in rocketmq source connector
xwm1992 Aug 17, 2023
3b7855f
Merge pull request #4380 from xwm1992/fix-4379
wqliang Aug 17, 2023
32e11dd
feat: refactor with rabbitmq plugin.
fabian4 Aug 18, 2023
50d6113
[ISSUE #4062]Implement the method printRetryThreadPoolState() in Even…
pandaapo Aug 21, 2023
beb2e57
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
3d4e922
[ISSUE #4025] Comparison using reference equality instead of value eq…
AbhishekPSingh07 Aug 22, 2023
403ae9c
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
a5639bf
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
3cd5670
Merge branch 'apache:master' into rabbitmq_plugin
fabian4 Aug 22, 2023
d7abc31
[ISSUE #4087] Anonymous new can be replaced with lambda (#4391)
kartiktayal Aug 23, 2023
1babec1
[ISSUE #4027] Comparison using reference equality instead of value eq…
harshithasudhakar Aug 23, 2023
d37bd9e
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
a53d6b3
Merge remote-tracking branch 'origin/rabbitmq_plugin' into rabbitmq_p…
fabian4 Aug 23, 2023
d7bd581
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
47d88a0
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
ac4c5cb
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
1b717a3
Merge branch 'apache:master' into rabbitmq_plugin
fabian4 Aug 23, 2023
ce4c8bb
Merge remote-tracking branch 'origin/rabbitmq_plugin' into rabbitmq_p…
fabian4 Aug 23, 2023
95ec173
[ISSUE #4388] Improve the rocketmq source connector offset ack (#4389)
xwm1992 Aug 23, 2023
3f1783f
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
fd1102a
Merge pull request #4393 from fabian4/rabbitmq_plugin
qqeasonchen Aug 24, 2023
5ecbe7b
Update .asf.yaml
qqeasonchen Aug 24, 2023
738e98d
Merge pull request #4399 from apache/qqeasonchen-patch-19
harshithasudhakar Aug 24, 2023
51a1171
[ISSUE #4195] Do some code optimization.[SubscribeProcessor] (#4383)
gautamsagar99 Aug 25, 2023
fcedbbc
[ISSUE #4341] WebHookConfig failed to load after being inserted (#4344)
Pil0tXia Aug 25, 2023
ebd5b5e
[ISSUE 4042] EventMesh Integrated k8s with different components. (#4377)
Alonexc Aug 25, 2023
707c98d
[ISSUE #4390] Realize the SPI extension loading of RabbitMQ admin-api…
Pil0tXia Aug 26, 2023
3678c23
[ISSUE #4398] Use bash to excute shell script and fix mis-output (#4401)
Pil0tXia Aug 26, 2023
b55e6db
[ISSUE #4403] Fix the SPI extension admin-api loading of storage-plug…
Pil0tXia Aug 28, 2023
af53e7a
[ISSUE #549] Unify request handler code style of tcp and http protoco…
hhuang1231 Aug 28, 2023
1e59cdf
[ISSUE #4369] Move Pravega plugin into Connector from Storage plugin…
fabian4 Aug 28, 2023
643f059
[ISSUE #4129]Enhance the functionality of EventMeshExtensionFactory (…
mxsm Aug 29, 2023
eea1d01
Add connector jdbc interface (#4332)
mxsm Aug 29, 2023
fbbb81d
[ISSUE #4403] Fix redis admin spi (#4405)
fabian4 Aug 29, 2023
9e4a0c0
[ISSUE #4406]Optimize Body#buildBody method (#4407)
mxsm Aug 30, 2023
5d390d3
[ISSUE #4374] Redesign registry module into meta storage (#4418)
xwm1992 Aug 30, 2023
c25a89b
Update README.md (#4422)
qqeasonchen Aug 30, 2023
1644984
[ISSUE #4423] Fix the meta-file error in the meta module (#4424)
xwm1992 Aug 30, 2023
3f2fd1c
[ISSUE #4019][Enhancement] Code optimization.[SubStreamHandler] (#4056)
PickBas Aug 31, 2023
1da7463
[ISSUE #4426] Optimize EventBridge with QUIC
seriouszyx Sep 4, 2023
d8f7d78
add EventMeshTrace
seriouszyx Sep 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[ISSUE #549] Unify request handler code style of tcp and http protocol (
#4333)

* refactor: enhance http and tcp server style

* fix: create trace-plugin
hhuang1231 authored Aug 28, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit af53e7a540e36c59a95e535b12bb21faf75cd29d
Original file line number Diff line number Diff line change
@@ -119,7 +119,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
if (session.getClient().getHost().equals(ip) && String.valueOf(
session.getClient().getPort()).equals(port)) {
redirectResult.append("|");
redirectResult.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,
redirectResult.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer.getTcpThreadPoolGroup(),
destEventMeshIp, Integer.parseInt(destEventMeshPort),
session, clientSessionGroupMapping));
}
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
// to the new EventMesh node specified by given EventMesh IP and port.
if (session.getClient().getPath().contains(path)) {
redirectResult.append("|");
redirectResult.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,
redirectResult.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer.getTcpThreadPoolGroup(),
destEventMeshIp, Integer.parseInt(destEventMeshPort),
session, clientSessionGroupMapping));
}
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ public void handle(final HttpExchange httpExchange) throws IOException {
// to the new EventMesh node specified by given EventMesh IP and port.
if (session.getClient().getSubsystem().equals(subSystem)) {
redirectResult.append('|')
.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,
.append(EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer.getTcpThreadPoolGroup(),
destEventMeshIp, Integer.parseInt(destEventMeshPort),
session, clientSessionGroupMapping));
}
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ public void handle(final HttpExchange httpExchange) throws IOException {
// Iterate through the sessionMap and close each client connection
for (final Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
final InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(
eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
eventMeshTCPServer.getTcpThreadPoolGroup(), entry.getValue(), clientSessionGroupMapping);
// Add the remote client address to a list of successfully rejected addresses
if (addr != null) {
successRemoteAddrs.add(addr);
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
// Reject client connection for each matching session found
if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer,
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer.getTcpThreadPoolGroup(),
entry.getValue(), clientSessionGroupMapping);
// Add the remote client address to a list of successfully rejected addresses
if (addr != null) {
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
for (Session session : sessionMap.values()) {
// Reject client connection for each matching session found
if (session.getClient().getSubsystem().equals(subSystem)) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session,
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer.getTcpThreadPoolGroup(), session,
clientSessionGroupMapping);
// Add the remote client address to a list of successfully rejected addresses
if (addr != null) {
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@ void delete(HttpExchange httpExchange) throws IOException {
if (entry.getKey().getHostString().equals(host) && entry.getKey().getPort() == port) {
// Call the serverGoodby2Client method in EventMeshTcp2Client to disconnect the client's connection
EventMeshTcp2Client.serverGoodby2Client(
eventMeshTCPServer,
eventMeshTCPServer.getTcpThreadPoolGroup(),
entry.getValue(),
clientSessionGroupMapping
);

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -32,53 +32,20 @@

import lombok.extern.slf4j.Slf4j;

/**
* The most basic server
*/
@Slf4j
public abstract class AbstractRemotingServer {

private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_SLEEP_SECONDS = 30;

private EventLoopGroup bossGroup;

private EventLoopGroup ioGroup;

private EventExecutorGroup workerGroup;

private int port;

private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

public EventLoopGroup getBossGroup() {
return bossGroup;
}

public EventLoopGroup getIoGroup() {
return ioGroup;
}

public EventExecutorGroup getWorkerGroup() {
return workerGroup;
}

public int getPort() {
return port;
}

public void setBossGroup(final EventLoopGroup bossGroup) {
this.bossGroup = bossGroup;
}

public void setIoGroup(final EventLoopGroup ioGroup) {
this.ioGroup = ioGroup;
}

public void setWorkerGroup(final EventExecutorGroup workerGroup) {
this.workerGroup = workerGroup;
}

public void setPort(final int port) {
this.port = port;
}

private void buildBossGroup(final String threadPrefix) {
if (useEpoll()) {
bossGroup = new EpollEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyEpoll-Boss", true));
@@ -106,6 +73,8 @@ public void init(final String threadPrefix) throws Exception {
buildWorkerGroup(threadPrefix);
}

public abstract void start() throws Exception;

public void shutdown() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
@@ -135,5 +104,35 @@ protected boolean useEpoll() {
return SystemUtils.isLinuxPlatform() && Epoll.isAvailable();
}

public abstract void start() throws Exception;
public EventLoopGroup getBossGroup() {
return bossGroup;
}

public EventLoopGroup getIoGroup() {
return ioGroup;
}

public EventExecutorGroup getWorkerGroup() {
return workerGroup;
}

public int getPort() {
return port;
}

public void setBossGroup(final EventLoopGroup bossGroup) {
this.bossGroup = bossGroup;
}

public void setIoGroup(final EventLoopGroup ioGroup) {
this.ioGroup = ioGroup;
}

public void setWorkerGroup(final EventExecutorGroup workerGroup) {
this.workerGroup = workerGroup;
}

public void setPort(final int port) {
this.port = port;
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ public void init() throws Exception {
// server init
if (eventMeshHttpConfiguration != null) {
eventMeshHttpServer = new EventMeshHTTPServer(eventMeshServer, eventMeshHttpConfiguration);
eventMeshHttpServer.init();
}
}

Original file line number Diff line number Diff line change
@@ -76,8 +76,8 @@ public EventMeshServer() {

//Initialize BOOTSTRAP_LIST based on protocols provided in configuration
final List<String> provideServerProtocols = configuration.getEventMeshProvideServerProtocols();
for (final String provideServerProtocol : provideServerProtocols) {
switch (provideServerProtocol) {
for (String provideServerProtocol : provideServerProtocols) {
switch (provideServerProtocol.toUpperCase()) {
case ConfigurationContextUtil.HTTP:
BOOTSTRAP_LIST.add(new EventMeshHttpBootstrap(this));
break;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class HTTPThreadPoolGroup implements ThreadPoolGroup {

private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;

private ThreadPoolExecutor batchMsgExecutor;
private ThreadPoolExecutor sendMsgExecutor;
private ThreadPoolExecutor remoteMsgExecutor;
private ThreadPoolExecutor replyMsgExecutor;
private ThreadPoolExecutor pushMsgExecutor;
private ThreadPoolExecutor clientManageExecutor;
private ThreadPoolExecutor adminExecutor;
private ThreadPoolExecutor webhookExecutor;

public HTTPThreadPoolGroup(EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
}

@Override
public void initThreadPool() {

batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()),
"eventMesh-batchMsg", true);

sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()),
"eventMesh-sendMsg", true);

remoteMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()),
"eventMesh-remoteMsg", true);

pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()),
"eventMesh-pushMsg", true);

clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()),
"eventMesh-clientManage", true);

adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
new LinkedBlockingQueue<>(50), "eventMesh-admin",
true);

replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
new LinkedBlockingQueue<>(100),
"eventMesh-replyMsg", true);

webhookExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
new LinkedBlockingQueue<>(100), "eventMesh-webhook", true);
}

@Override
public void shutdownThreadPool() {
if (batchMsgExecutor != null) {
batchMsgExecutor.shutdown();
}
if (adminExecutor != null) {
adminExecutor.shutdown();
}
if (clientManageExecutor != null) {
clientManageExecutor.shutdown();
}
if (sendMsgExecutor != null) {
sendMsgExecutor.shutdown();
}
if (remoteMsgExecutor != null) {
remoteMsgExecutor.shutdown();
}
if (pushMsgExecutor != null) {
pushMsgExecutor.shutdown();
}
if (replyMsgExecutor != null) {
replyMsgExecutor.shutdown();
}
}

public ThreadPoolExecutor getBatchMsgExecutor() {
return batchMsgExecutor;
}

public ThreadPoolExecutor getSendMsgExecutor() {
return sendMsgExecutor;
}

public ThreadPoolExecutor getRemoteMsgExecutor() {
return remoteMsgExecutor;
}

public ThreadPoolExecutor getReplyMsgExecutor() {
return replyMsgExecutor;
}

public ThreadPoolExecutor getPushMsgExecutor() {
return pushMsgExecutor;
}

public ThreadPoolExecutor getClientManageExecutor() {
return clientManageExecutor;
}

public ThreadPoolExecutor getAdminExecutor() {
return adminExecutor;
}

public ThreadPoolExecutor getWebhookExecutor() {
return webhookExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class TCPThreadPoolGroup implements ThreadPoolGroup {
private final EventMeshTCPConfiguration eventMeshTCPConfiguration;
private ScheduledExecutorService scheduler;
private ThreadPoolExecutor taskHandleExecutorService;
private ThreadPoolExecutor broadcastMsgDownstreamExecutorService;

public TCPThreadPoolGroup(EventMeshTCPConfiguration eventMeshTCPConfiguration) {
this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
}

@Override
public void initThreadPool() {

scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.getEventMeshTcpGlobalScheduler(),
new EventMeshThreadFactory("eventMesh-tcp-scheduler", true));

taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new EventMeshThreadFactory("eventMesh-tcp-msg-downstream", true));
}

@Override
public void shutdownThreadPool() {
scheduler.shutdown();
taskHandleExecutorService.shutdown();
broadcastMsgDownstreamExecutorService.shutdown();
}

public ScheduledExecutorService getScheduler() {
return scheduler;
}

public ThreadPoolExecutor getTaskHandleExecutorService() {
return taskHandleExecutorService;
}

public ThreadPoolExecutor getBroadcastMsgDownstreamExecutorService() {
return broadcastMsgDownstreamExecutorService;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

/**
* The implementation class for this interface is used to assemble the thread pool required by the server
*
*/
public interface ThreadPoolGroup {
void initThreadPool();

void shutdownThreadPool();
}
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ private void checkTimeout() {

public HTTPMessageHandler(EventMeshConsumer eventMeshConsumer) {
this.eventMeshConsumer = eventMeshConsumer;
this.pushExecutor = eventMeshConsumer.getEventMeshHTTPServer().getPushMsgExecutor();
this.pushExecutor = eventMeshConsumer.getEventMeshHTTPServer().getHttpThreadPoolGroup().getPushMsgExecutor();
waitingRequests.put(this.eventMeshConsumer.getConsumerGroupConf().getConsumerGroup(), Sets.newConcurrentHashSet());
SCHEDULER.scheduleAtFixedRate(this::checkTimeout, 0, 1000, TimeUnit.MILLISECONDS);
}
@@ -101,7 +101,7 @@ public boolean handle(final HandleMsgContext handleMsgContext) {
return true;
} catch (RejectedExecutionException e) {
log.warn("pushMsgThreadPoolQueue is full, so reject, current task size {}",
handleMsgContext.getEventMeshHTTPServer().getPushMsgExecutor().getQueue().size(), e);
handleMsgContext.getEventMeshHTTPServer().getHttpThreadPoolGroup().getPushMsgExecutor().getQueue().size(), e);
return false;
}
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.boot.TCPThreadPoolGroup;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
@@ -45,7 +45,7 @@
@Slf4j
public class EventMeshTcp2Client {

public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMeshTCPServer,
public static InetSocketAddress serverGoodby2Client(TCPThreadPoolGroup tcpThreadPoolGroup,
Session session,
ClientSessionGroupMapping mapping) {
log.info("serverGoodby2Client client[{}]", session.getClient());
@@ -55,7 +55,7 @@ public static InetSocketAddress serverGoodby2Client(EventMeshTCPServer eventMesh
msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, OPStatus.SUCCESS.getCode(),
"graceful normal quit from eventmesh", null));

eventMeshTCPServer.getScheduler().submit(new Runnable() {
tcpThreadPoolGroup.getScheduler().submit(new Runnable() {
@Override
public void run() {
long taskExecuteTime = System.currentTimeMillis();
@@ -64,30 +64,30 @@ public void run() {
});
InetSocketAddress address = (InetSocketAddress) session.getContext().channel().remoteAddress();

closeSessionIfTimeout(eventMeshTCPServer, session, mapping);
closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
return address;
} catch (Exception e) {
log.error("exception occur while serverGoodby2Client", e);
return null;
}
}

public static InetSocketAddress goodBye2Client(EventMeshTCPServer eventMeshTCPServer, Session session,
String errMsg, int eventMeshStatus,
ClientSessionGroupMapping mapping) {
public static InetSocketAddress goodBye2Client(TCPThreadPoolGroup tcpThreadPoolGroup, Session session,
String errMsg, int eventMeshStatus,
ClientSessionGroupMapping mapping) {
try {
long startTime = System.currentTimeMillis();
Package msg = new Package();
msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, eventMeshStatus, errMsg, null));
eventMeshTCPServer.getScheduler().schedule(new Runnable() {
tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
@Override
public void run() {
long taskExecuteTime = System.currentTimeMillis();
Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
}
}, 1 * 1000, TimeUnit.MILLISECONDS);

closeSessionIfTimeout(eventMeshTCPServer, session, mapping);
closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);

return session.getRemoteAddress();
} catch (Exception e) {
@@ -115,7 +115,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCPServer, String newEventMeshIp,
public static String redirectClient2NewEventMesh(TCPThreadPoolGroup tcpThreadPoolGroup, String newEventMeshIp,
int port, Session session, ClientSessionGroupMapping mapping) {
log.info("begin to gracefully redirect Client {}, newIPPort[{}]", session.getClient(), newEventMeshIp + ":"
+ port);
@@ -125,24 +125,24 @@ public static String redirectClient2NewEventMesh(EventMeshTCPServer eventMeshTCP
Package pkg = new Package();
pkg.setHeader(new Header(REDIRECT_TO_CLIENT, OPStatus.SUCCESS.getCode(), null, null));
pkg.setBody(new RedirectInfo(newEventMeshIp, port));
eventMeshTCPServer.getScheduler().schedule(new Runnable() {
tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
@Override
public void run() {
long taskExecuteTime = System.currentTimeMillis();
Utils.writeAndFlush(pkg, startTime, taskExecuteTime, session.getContext(), session);
}
}, 5 * 1000, TimeUnit.MILLISECONDS);
closeSessionIfTimeout(eventMeshTCPServer, session, mapping);
closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
return session.getRemoteAddress() + "--->" + newEventMeshIp + ":" + port;
} catch (Exception e) {
log.error("exception occur while redirectClient2NewEventMesh", e);
return null;
}
}

public static void closeSessionIfTimeout(EventMeshTCPServer eventMeshTCPServer, Session session,
public static void closeSessionIfTimeout(TCPThreadPoolGroup tcpThreadPoolGroup, Session session,
ClientSessionGroupMapping mapping) {
eventMeshTCPServer.getScheduler().schedule(new Runnable() {
tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
@Override
public void run() {
try {

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ public ClientGroupWrapper(String sysId, String group,
this.group = group;
this.eventMeshTCPServer = eventMeshTCPServer;
this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpRetryer = eventMeshTCPServer.getTcpRetryer();
this.eventMeshTcpMonitor =
Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
@@ -627,7 +627,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception {
downStreamMsgContext.setSession(session);

//downstream broadcast msg asynchronously
eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService()
.submit(() -> {
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
Original file line number Diff line number Diff line change
@@ -357,7 +357,7 @@ private void shutdownClientGroupProducer(ClientGroupWrapper clientGroupWrapper)
}

private void initSessionCleaner() {
eventMeshTCPServer.getScheduler().scheduleAtFixedRate(
eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(
() -> {
for (Session tmp : sessionTable.values()) {
if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
@@ -376,7 +376,7 @@ private void initSessionCleaner() {
}

private void initDownStreamMsgContextCleaner() {
eventMeshTCPServer.getScheduler().scheduleAtFixedRate(
eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(
() -> {

//scan non-broadcast msg
@@ -412,15 +412,15 @@ public void shutdown() throws Exception {
for (ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()) {
for (Session subSession : clientGroupWrapper.getGroupConsumerSessions()) {
try {
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, subSession, this);
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer.getTcpThreadPoolGroup(), subSession, this);
} catch (Exception e) {
log.error("say goodbye to subSession error! {}", subSession, e);
}
}

for (Session pubSession : clientGroupWrapper.getGroupProducerSessions()) {
try {
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, pubSession, this);
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer.getTcpThreadPoolGroup(), pubSession, this);
} catch (Exception e) {
log.error("say goodbye to pubSession error! {}", pubSession, e);
}
@@ -434,7 +434,7 @@ public void shutdown() throws Exception {

sessionTable.values().parallelStream().forEach(itr -> {
try {
EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, itr, this);
EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer.getTcpThreadPoolGroup(), itr, this);
} catch (Exception e) {
log.error("say goodbye to session error! {}", itr, e);
}
Original file line number Diff line number Diff line change
@@ -15,16 +15,18 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.CLIENT_GOODBYE_RESPONSE;

import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.Utils;

import java.util.Arrays;
@@ -35,14 +37,19 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GoodbyeTask extends AbstractTask {
public class GoodbyeProcessor implements TcpProcessor {

public GoodbyeTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public GoodbyeProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
Package msg = new Package();
try {
@@ -58,9 +65,10 @@ public void run() {
msg.setHeader(new Header(CLIENT_GOODBYE_RESPONSE, OPStatus.FAIL.getCode(), Arrays.toString(e.getStackTrace()),
pkg.getHeader().getSeq()));
} finally {
this.eventMeshTCPServer.getScheduler().submit(() -> Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session));
this.eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler()
.submit(() -> Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session));
}
EventMeshTcp2Client
.closeSessionIfTimeout(this.eventMeshTCPServer, session, eventMeshTCPServer.getClientSessionGroupMapping());
.closeSessionIfTimeout(this.eventMeshTCPServer.getTcpThreadPoolGroup(), session, eventMeshTCPServer.getClientSessionGroupMapping());
}
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_REQUEST;
import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_RESPONSE;
@@ -25,6 +25,7 @@
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;

@@ -36,17 +37,19 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HeartBeatTask extends AbstractTask {
public class HeartBeatProcessor implements TcpProcessor {

private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public HeartBeatTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
public HeartBeatProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
Package res = new Package();
try {
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.HELLO_RESPONSE;

@@ -46,19 +46,20 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HelloTask extends AbstractTask {
public class HelloProcessor implements TcpProcessor {

private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);

private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public HelloTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
public HelloProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
long taskExecuteTime = System.currentTimeMillis();
Package res = new Package();
Session session = null;
Original file line number Diff line number Diff line change
@@ -15,29 +15,37 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.LISTEN_RESPONSE;

import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

import io.netty.channel.ChannelHandlerContext;


import lombok.extern.slf4j.Slf4j;


@Slf4j
public class ListenTask extends AbstractTask {
public class ListenProcessor implements TcpProcessor {

private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public ListenTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
public ListenProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
Header header = new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader().getSeq());
session.setListenRequestSeq(pkg.getHeader().getSeq());
Original file line number Diff line number Diff line change
@@ -15,12 +15,14 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;

import org.slf4j.Logger;
@@ -32,16 +34,19 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MessageAckTask extends AbstractTask {

public class MessageAckProcessor implements TcpProcessor {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public MessageAckTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
public MessageAckProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
String seq = pkg.getHeader().getSeq();
Command cmd = pkg.getHeader().getCmd();
Original file line number Diff line number Diff line change
@@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
import static org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.SessionSender.TRY_PERMIT_TIME_OUT;

import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
@@ -33,6 +34,7 @@
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
@@ -63,21 +65,30 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MessageTransferTask extends AbstractTask {

public class MessageTransferProcessor implements TcpProcessor {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);

private static final int TRY_PERMIT_TIME_OUT = 5;

private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public MessageTransferTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
private Package pkg;
private ChannelHandlerContext ctx;
private Session session;
private long startTime;

public MessageTransferProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);

this.pkg = pkg;
this.ctx = ctx;
this.session = session;
this.startTime = startTime;

long taskExecuteTime = System.currentTimeMillis();
Command cmd = pkg.getHeader().getCmd();

Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import static org.apache.eventmesh.common.protocol.tcp.Command.RECOMMEND_RESPONSE;
import static org.apache.eventmesh.runtime.util.Utils.writeAndFlush;
@@ -24,10 +24,12 @@
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

import org.apache.commons.lang3.StringUtils;

@@ -36,16 +38,20 @@

import io.netty.channel.ChannelHandlerContext;

public class RecommendTask extends AbstractTask {

public class RecommendProcessor implements TcpProcessor {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);

public RecommendTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public RecommendProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
Package res = new Package();
try {
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.api.registry.bo.EventMeshAppSubTopicInfo;
@@ -27,6 +27,7 @@
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;

@@ -40,17 +41,19 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SubscribeTask extends AbstractTask {
public class SubscribeProcessor implements TcpProcessor {

private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public SubscribeTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
public SubscribeProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
final long taskExecuteTime = System.currentTimeMillis();

final Package msg = new Package();
Original file line number Diff line number Diff line change
@@ -15,27 +15,15 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

import io.netty.channel.ChannelHandlerContext;

public abstract class AbstractTask implements Runnable {

protected Package pkg;
protected ChannelHandlerContext ctx;
protected Session session;
protected long startTime;
protected EventMeshTCPServer eventMeshTCPServer;

public AbstractTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.pkg = pkg;
this.ctx = ctx;
this.session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
this.startTime = startTime;
}
/**
* TcpProcessor
*/
public interface TcpProcessor {
void process(final Package pkg, final ChannelHandlerContext ctx, long startTime);
}
Original file line number Diff line number Diff line change
@@ -15,15 +15,17 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
package org.apache.eventmesh.runtime.core.protocol.tcp.client.processor;

import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.Utils;

import org.apache.commons.collections4.MapUtils;
@@ -37,16 +39,20 @@

import io.netty.channel.ChannelHandlerContext;

public class UnSubscribeTask extends AbstractTask {

public class UnSubscribeProcessor implements TcpProcessor {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);

public UnSubscribeTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
private EventMeshTCPServer eventMeshTCPServer;
private final Acl acl;

public UnSubscribeProcessor(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.acl = eventMeshTCPServer.getAcl();
}

@Override
public void run() {
public void process(final Package pkg, final ChannelHandlerContext ctx, long startTime) {
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
long taskExecuteTime = System.currentTimeMillis();
Package msg = new Package();
try {
Original file line number Diff line number Diff line change
@@ -167,7 +167,7 @@ private void doRedirect(String group, String purpose, int judge, List<String> ev
for (int i = 0; i < judge; i++) {
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, newProxyIp,
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer.getTcpThreadPoolGroup(), newProxyIp,
Integer.parseInt(newProxyPort), sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
log.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
ThreadUtils.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().getSleepIntervalInRebalanceRedirectMills(), TimeUnit.MILLISECONDS);
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ public class SessionSender {

public final transient AtomicLong failMsgCount = new AtomicLong(0);

private static final int TRY_PERMIT_TIME_OUT = 5;
public static final int TRY_PERMIT_TIME_OUT = 5;

@Override
public String toString() {
Original file line number Diff line number Diff line change
@@ -41,16 +41,16 @@ public class HTTPMetricsServer {
private final transient HttpSummaryMetrics summaryMetrics;

public HTTPMetricsServer(final EventMeshHTTPServer eventMeshHTTPServer,
final List<MetricsRegistry> metricsRegistries) {
final List<MetricsRegistry> metricsRegistries) {
Objects.requireNonNull(eventMeshHTTPServer, "EventMeshHTTPServer can not be null");
Objects.requireNonNull(metricsRegistries, "List<MetricsRegistry> can not be null");

this.eventMeshHTTPServer = eventMeshHTTPServer;
this.metricsRegistries = metricsRegistries;
this.summaryMetrics = new HttpSummaryMetrics(
eventMeshHTTPServer.getBatchMsgExecutor(),
eventMeshHTTPServer.getSendMsgExecutor(),
eventMeshHTTPServer.getPushMsgExecutor(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor(),
eventMeshHTTPServer.getHttpRetryer().getFailedQueue());

init();
@@ -168,9 +168,9 @@ private void logPrintServerMetrics(final HttpSummaryMetrics summaryMetrics,

if (log.isInfoEnabled()) {
log.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpRetryer().size());
}

Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ public void start() throws Exception {
});

int delay = 60 * 1000;
monitorTpsTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate((() -> {
monitorTpsTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate((() -> {
int msgNum = tcpSummaryMetrics.client2eventMeshMsgNum();
tcpSummaryMetrics.resetClient2EventMeshMsgNum();
tcpSummaryMetrics.setClient2eventMeshTPS((int) 1000.0d * msgNum / period);
@@ -121,18 +121,18 @@ public void start() throws Exception {
topicSet.addAll(session.getSessionContext().getSubscribeTopics().keySet());
}
tcpSummaryMetrics.setSubTopicNum(topicSet.size());
tcpSummaryMetrics.setAllConnections(eventMeshTCPServer.getEventMeshTcpConnectionHandler().getConnectionCount());
tcpSummaryMetrics.setAllConnections(eventMeshTCPServer.getTcpConnectionHandler().getConnectionCount());
printAppLogger(tcpSummaryMetrics);


}), delay, period, TimeUnit.MILLISECONDS);

monitorThreadPoolTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate(() -> {
monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
eventMeshTCPServer.getEventMeshTcpRetryer().printRetryThreadPoolState();
eventMeshTCPServer.getTcpRetryer().printRetryThreadPoolState();

//monitor retry queue size
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getEventMeshTcpRetryer().getRetrySize());
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getTcpRetryer().getRetrySize());
appLogger.info(
MonitorMetricConstants.EVENTMESH_MONITOR_FORMAT_COMMON,
EventMeshConstants.PROTOCOL_TCP,