Skip to content

Commit 36160ba

Browse files
echistyakovmeta-codesync[bot]
authored andcommitted
Add plumbing to send termination signal to handler
Summary: TSIA. To be used downstream. #build_rule_type[go_library,go_binary,go_test] Reviewed By: siutsin Differential Revision: D86181808 fbshipit-source-id: 9648a6933b5c0845e83c4617f45937418a5ff9e5
1 parent 542e23f commit 36160ba

File tree

5 files changed

+55
-0
lines changed

5 files changed

+55
-0
lines changed

third-party/thrift/src/thrift/compiler/generate/templates/go/svc/processor.mustache

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,12 @@ func (p *{{service:go_name}}Processor) GetThriftMetadata() *{{program:metadata_q
9999

100100
{{/function:go_server_supported?}}
101101
{{/service:functions}}
102+
{{#service:interaction?}}
103+
104+
func (p *{{service:go_name}}Processor) OnTermination() {
105+
// If the underlying handler implements OnTermination()
106+
if terminable, ok := p.handler.(thrift.Terminable); ok {
107+
terminable.OnTermination()
108+
}
109+
}
110+
{{/service:interaction?}}

third-party/thrift/src/thrift/compiler/test/fixtures/basic-annotations/out/go/gen-go/svcs.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ func (p *procFuncBadInteractionFoo) RunContext(ctx context.Context, reqStruct th
141141
return result, nil
142142
}
143143

144+
func (p *BadInteractionProcessor) OnTermination() {
145+
// If the underlying handler implements OnTermination()
146+
if terminable, ok := p.handler.(thrift.Terminable); ok {
147+
terminable.OnTermination()
148+
}
149+
}
150+
144151

145152
type MyService interface {
146153
Ping(ctx context.Context) (error)

third-party/thrift/src/thrift/compiler/test/fixtures/interactions/out/go/gen-go/svcs.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ func (p *procFuncMyInteractionTruthify) RunStreamContext(
299299
onStreamComplete()
300300
}
301301

302+
func (p *MyInteractionProcessor) OnTermination() {
303+
// If the underlying handler implements OnTermination()
304+
if terminable, ok := p.handler.(thrift.Terminable); ok {
305+
terminable.OnTermination()
306+
}
307+
}
308+
302309
type MyInteractionFast interface {
303310
Frobnicate(ctx context.Context) (int32, error)
304311
Ping(ctx context.Context) (error)
@@ -563,6 +570,13 @@ func (p *procFuncMyInteractionFastTruthify) RunStreamContext(
563570
onStreamComplete()
564571
}
565572

573+
func (p *MyInteractionFastProcessor) OnTermination() {
574+
// If the underlying handler implements OnTermination()
575+
if terminable, ok := p.handler.(thrift.Terminable); ok {
576+
terminable.OnTermination()
577+
}
578+
}
579+
566580
type SerialInteraction interface {
567581
Frobnicate(ctx context.Context) (error)
568582
}
@@ -677,6 +691,13 @@ func (p *procFuncSerialInteractionFrobnicate) RunContext(ctx context.Context, re
677691
return result, nil
678692
}
679693

694+
func (p *SerialInteractionProcessor) OnTermination() {
695+
// If the underlying handler implements OnTermination()
696+
if terminable, ok := p.handler.(thrift.Terminable); ok {
697+
terminable.OnTermination()
698+
}
699+
}
700+
680701
type BoxedInteraction interface {
681702
GetABox(ctx context.Context) (*ShouldBeBoxed, error)
682703
}
@@ -792,6 +813,13 @@ func (p *procFuncBoxedInteractionGetABox) RunContext(ctx context.Context, reqStr
792813
return result, nil
793814
}
794815

816+
func (p *BoxedInteractionProcessor) OnTermination() {
817+
// If the underlying handler implements OnTermination()
818+
if terminable, ok := p.handler.(thrift.Terminable); ok {
819+
terminable.OnTermination()
820+
}
821+
}
822+
795823

796824
type MyService interface {
797825
Foo(ctx context.Context) (error)

third-party/thrift/src/thrift/lib/go/thrift/rocket_server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,15 @@ func (s *rocketServer) metadataPush(msg payload.Payload) {
205205
if metadata.InteractionTerminate != nil {
206206
interactionID := metadata.InteractionTerminate.InteractionId
207207
s.interactionsMutex.Lock()
208+
interaction, ok := s.interactions[interactionID]
208209
delete(s.interactions, interactionID)
209210
s.interactionsMutex.Unlock()
210211
s.log("receive interaction terminate signal for ID %d", interactionID)
212+
if ok {
213+
if terminable, isTerminable := interaction.(types.Terminable); isTerminable {
214+
terminable.OnTermination()
215+
}
216+
}
211217
} else if metadata.StreamHeadersPush != nil {
212218
s.log("unsupported StreamHeadersPush metadata type")
213219
} else if metadata.TransportMetadataPush != nil {

third-party/thrift/src/thrift/lib/go/thrift/types/interaction.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,8 @@ const (
2222
interactionCreateKey interactionContextKey = 1
2323
interactionIDKey interactionContextKey = 2
2424
)
25+
26+
// Terminable is implemented by an interaction processor.
27+
type Terminable interface {
28+
OnTermination()
29+
}

0 commit comments

Comments
 (0)