From bd370d7b05a246213f65faf2adc9a0e85863a28d Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Sun, 14 Jul 2019 16:53:57 -0700 Subject: [PATCH 01/25] added subscribe support --- definition.go | 3 + subscription.go | 217 +++++++++++++++++++++++++++++++++++++++++++ subscription_test.go | 134 ++++++++++++++++++++++++++ 3 files changed, 354 insertions(+) create mode 100644 subscription.go create mode 100644 subscription_test.go diff --git a/definition.go b/definition.go index 4b132991..35af4eb3 100644 --- a/definition.go +++ b/definition.go @@ -534,6 +534,7 @@ func defineFieldMap(ttype Named, fieldMap Fields) (FieldDefinitionMap, error) { Description: field.Description, Type: field.Type, Resolve: field.Resolve, + Subscribe: field.Subscribe, DeprecationReason: field.DeprecationReason, } @@ -606,6 +607,7 @@ type Field struct { Type Output `json:"type"` Args FieldConfigArgument `json:"args"` Resolve FieldResolveFn `json:"-"` + Subscribe FieldResolveFn `json:"-"` DeprecationReason string `json:"deprecationReason"` Description string `json:"description"` } @@ -625,6 +627,7 @@ type FieldDefinition struct { Type Output `json:"type"` Args []*Argument `json:"args"` Resolve FieldResolveFn `json:"-"` + Subscribe FieldResolveFn `json:"-"` DeprecationReason string `json:"deprecationReason"` } diff --git a/subscription.go b/subscription.go new file mode 100644 index 00000000..7d2781f5 --- /dev/null +++ b/subscription.go @@ -0,0 +1,217 @@ +package graphql + +import ( + "context" + "fmt" + + "github.com/graphql-go/graphql/gqlerrors" + "github.com/graphql-go/graphql/language/ast" +) + +type ResultIteratorFn func(count int64, result *Result, doneFunc func()) + +type ResultIterator struct { + count int64 + ctx context.Context + ch chan *Result + cancelFunc context.CancelFunc + cancelled bool + handlers []ResultIteratorFn +} + +func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { + if ctx == nil { + ctx = context.Background() + } + + cctx, cancelFunc := context.WithCancel(ctx) + iterator := &ResultIterator{ + count: 0, + ctx: cctx, + ch: ch, + cancelFunc: cancelFunc, + cancelled: false, + handlers: []ResultIteratorFn{}, + } + + go func() { + for { + select { + case <-iterator.ctx.Done(): + return + case res := <-iterator.ch: + if iterator.cancelled { + return + } + iterator.count += 1 + for _, handler := range iterator.handlers { + handler(iterator.count, res, iterator.Done) + } + } + } + }() + + return iterator +} + +func (c *ResultIterator) ForEach(handler ResultIteratorFn) { + c.handlers = append(c.handlers, handler) +} + +func (c *ResultIterator) Done() { + c.cancelled = true + c.cancelFunc() +} + +type SubscribeParams struct { + Schema Schema + Document *ast.Document + RootValue interface{} + ContextValue context.Context + VariableValues map[string]interface{} + OperationName string + FieldResolver FieldResolveFn + FieldSubscriber FieldResolveFn +} + +// Subscribe performs a subscribe operation +func Subscribe(p SubscribeParams) *ResultIterator { + resultChannel := make(chan *Result) + // Use background context if no context was provided + ctx := p.ContextValue + if ctx == nil { + ctx = context.Background() + } + + var mapSourceToResponse = func(payload interface{}) *Result { + return Execute(ExecuteParams{ + Schema: p.Schema, + Root: payload, + AST: p.Document, + OperationName: p.OperationName, + Args: p.VariableValues, + Context: p.ContextValue, + }) + } + + go func() { + + result := &Result{} + defer func() { + if err := recover(); err != nil { + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + } + resultChannel <- result + }() + + exeContext, err := buildExecutionContext(buildExecutionCtxParams{ + Schema: p.Schema, + Root: p.RootValue, + AST: p.Document, + OperationName: p.OperationName, + Args: p.VariableValues, + Result: result, + Context: p.ContextValue, + }) + + if err != nil { + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result + return + } + + operationType, err := getOperationRootType(p.Schema, exeContext.Operation) + if err != nil { + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result + return + } + + fields := collectFields(collectFieldsParams{ + ExeContext: exeContext, + RuntimeType: operationType, + SelectionSet: exeContext.Operation.GetSelectionSet(), + }) + + responseNames := []string{} + for name := range fields { + responseNames = append(responseNames, name) + } + responseName := responseNames[0] + fieldNodes := fields[responseName] + fieldNode := fieldNodes[0] + fieldName := fieldNode.Name.Value + fieldDef := getFieldDef(p.Schema, operationType, fieldName) + + if fieldDef == nil { + err := fmt.Errorf("the subscription field %q is not defined", fieldName) + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result + return + } + + resolveFn := p.FieldSubscriber + if resolveFn == nil { + resolveFn = DefaultResolveFn + } + if fieldDef.Subscribe != nil { + resolveFn = fieldDef.Subscribe + } + fieldPath := &ResponsePath{ + Key: responseName, + } + + args := getArgumentValues(fieldDef.Args, fieldNode.Arguments, exeContext.VariableValues) + info := ResolveInfo{ + FieldName: fieldName, + FieldASTs: fieldNodes, + Path: fieldPath, + ReturnType: fieldDef.Type, + ParentType: operationType, + Schema: p.Schema, + Fragments: exeContext.Fragments, + RootValue: exeContext.Root, + Operation: exeContext.Operation, + VariableValues: exeContext.VariableValues, + } + + fieldResult, err := resolveFn(ResolveParams{ + Source: p.RootValue, + Args: args, + Info: info, + Context: exeContext.Context, + }) + if err != nil { + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result + return + } + + if fieldResult == nil { + err := fmt.Errorf("no field result") + result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result + return + } + + switch fieldResult.(type) { + case chan interface{}: + for { + select { + case <-ctx.Done(): + fmt.Printf("done context called") + return + case res := <-fieldResult.(chan interface{}): + + resultChannel <- mapSourceToResponse(res) + } + } + default: + resultChannel <- mapSourceToResponse(fieldResult) + return + } + }() + + // return a result iterator + return NewResultIterator(p.ContextValue, resultChannel) +} diff --git a/subscription_test.go b/subscription_test.go new file mode 100644 index 00000000..67cf9358 --- /dev/null +++ b/subscription_test.go @@ -0,0 +1,134 @@ +package graphql + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/graphql-go/graphql/language/parser" + "github.com/graphql-go/graphql/language/source" +) + +func TestSubscription(t *testing.T) { + var maxPublish = 5 + m := make(chan interface{}) + + source1 := source.NewSource(&source.Source{ + Body: []byte(`subscription { + watch_count + }`), + Name: "GraphQL request", + }) + + source2 := source.NewSource(&source.Source{ + Body: []byte(`subscription { + watch_should_fail + }`), + Name: "GraphQL request", + }) + + document1, _ := parser.Parse(parser.ParseParams{Source: source1}) + document2, _ := parser.Parse(parser.ParseParams{Source: source2}) + + schema, err := NewSchema(SchemaConfig{ + Query: NewObject(ObjectConfig{ + Name: "Query", + Fields: Fields{ + "hello": &Field{ + Type: String, + Resolve: func(p ResolveParams) (interface{}, error) { + return "world", nil + }, + }, + }, + }), + Subscription: NewObject(ObjectConfig{ + Name: "Subscription", + Fields: Fields{ + "watch_count": &Field{ + Type: String, + Resolve: func(p ResolveParams) (interface{}, error) { + return fmt.Sprintf("count=%v", p.Source), nil + }, + Subscribe: func(p ResolveParams) (interface{}, error) { + return m, nil + }, + }, + "watch_should_fail": &Field{ + Type: String, + Resolve: func(p ResolveParams) (interface{}, error) { + return fmt.Sprintf("count=%v", p.Source), nil + }, + Subscribe: func(p ResolveParams) (interface{}, error) { + return nil, nil + }, + }, + }, + }), + }) + + if err != nil { + t.Errorf("failed to create schema: %v", err) + return + } + + failIterator := Subscribe(SubscribeParams{ + Schema: schema, + Document: document2, + }) + + // test a subscribe that should fail due to no return value + failIterator.ForEach(func(count int64, res *Result, doneFunc func()) { + if !res.HasErrors() { + t.Errorf("subscribe failed to catch nil result from subscribe") + doneFunc() + return + } + doneFunc() + return + }) + + resultIterator := Subscribe(SubscribeParams{ + Schema: schema, + Document: document1, + ContextValue: context.Background(), + }) + + resultIterator.ForEach(func(count int64, res *Result, doneFunc func()) { + if res.HasErrors() { + t.Errorf("subscribe error(s): %v", res.Errors) + doneFunc() + return + } + + if res.Data != nil { + data := res.Data.(map[string]interface{})["watch_count"] + expected := fmt.Sprintf("count=%d", count) + actual := fmt.Sprintf("%v", data) + if actual != expected { + t.Errorf("subscription result error: expected %q, actual %q", expected, actual) + doneFunc() + return + } + + // test the done func by quitting after 3 iterations + // the publisher will publish up to 5 + if count >= int64(maxPublish-2) { + doneFunc() + return + } + } + }) + + // start publishing + go func() { + for i := 1; i <= maxPublish; i++ { + time.Sleep(200 * time.Millisecond) + m <- i + } + }() + + // give time for the test to complete + time.Sleep(1 * time.Second) +} From e96c7c7746235b3d6101eddc61c5f9b769f5299a Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Sun, 14 Jul 2019 16:57:26 -0700 Subject: [PATCH 02/25] removing ordered fields code to keep PRs separate --- executor.go | 41 +---------------------------------------- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/executor.go b/executor.go index e3976ebb..d8477140 100644 --- a/executor.go +++ b/executor.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "sort" "strings" "github.com/graphql-go/graphql/gqlerrors" @@ -255,9 +254,7 @@ func executeFieldsSerially(p executeFieldsParams) *Result { } finalResults := make(map[string]interface{}, len(p.Fields)) - for _, orderedField := range orderedFields(p.Fields) { - responseName := orderedField.responseName - fieldASTs := orderedField.fieldASTs + for responseName, fieldASTs := range p.Fields { fieldPath := p.Path.WithKey(responseName) resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs, fieldPath) if state.hasNoFieldDefs { @@ -1041,39 +1038,3 @@ func getFieldDef(schema Schema, parentType *Object, fieldName string) *FieldDefi } return parentType.Fields()[fieldName] } - -// contains field information that will be placed in an ordered slice -type orderedField struct { - responseName string - fieldASTs []*ast.Field -} - -// orders fields from a fields map by location in the source -func orderedFields(fields map[string][]*ast.Field) []*orderedField { - orderedFields := []*orderedField{} - fieldMap := map[int]*orderedField{} - startLocs := []int{} - - for responseName, fieldASTs := range fields { - // find the lowest location in the current fieldASTs - lowest := -1 - for _, fieldAST := range fieldASTs { - loc := fieldAST.GetLoc().Start - if lowest == -1 || loc < lowest { - lowest = loc - } - } - startLocs = append(startLocs, lowest) - fieldMap[lowest] = &orderedField{ - responseName: responseName, - fieldASTs: fieldASTs, - } - } - - sort.Ints(startLocs) - for _, startLoc := range startLocs { - orderedFields = append(orderedFields, fieldMap[startLoc]) - } - - return orderedFields -} From a8d0d006948be3cb3214c4a7788b8e093fd3d715 Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Sun, 14 Jul 2019 17:12:24 -0700 Subject: [PATCH 03/25] adding waitgroups to handle race --- subscription.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/subscription.go b/subscription.go index 7d2781f5..27b36c2d 100644 --- a/subscription.go +++ b/subscription.go @@ -3,6 +3,7 @@ package graphql import ( "context" "fmt" + "sync" "github.com/graphql-go/graphql/gqlerrors" "github.com/graphql-go/graphql/language/ast" @@ -12,6 +13,7 @@ type ResultIteratorFn func(count int64, result *Result, doneFunc func()) type ResultIterator struct { count int64 + wg sync.WaitGroup ctx context.Context ch chan *Result cancelFunc context.CancelFunc @@ -43,8 +45,12 @@ func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { if iterator.cancelled { return } - iterator.count += 1 + iterator.wg.Wait() + iterator.wg.Add(1) + iterator.count++ + iterator.wg.Done() for _, handler := range iterator.handlers { + iterator.wg.Wait() handler(iterator.count, res, iterator.Done) } } @@ -55,7 +61,9 @@ func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { } func (c *ResultIterator) ForEach(handler ResultIteratorFn) { + c.wg.Add(1) c.handlers = append(c.handlers, handler) + c.wg.Done() } func (c *ResultIterator) Done() { From 9cf0da75dc4e4f32394451af7227539a58d896ad Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Tue, 16 Jul 2019 08:55:37 -0700 Subject: [PATCH 04/25] Updating ResultIterator api and adding doneFunc per handler --- subscription.go | 87 ++++++++++++++++++++++++++++++++++---------- subscription_test.go | 28 +++++++------- 2 files changed, 82 insertions(+), 33 deletions(-) diff --git a/subscription.go b/subscription.go index 27b36c2d..481559b9 100644 --- a/subscription.go +++ b/subscription.go @@ -9,18 +9,36 @@ import ( "github.com/graphql-go/graphql/language/ast" ) -type ResultIteratorFn func(count int64, result *Result, doneFunc func()) +// ResultIteratorParams parameters passed to the result iterator handler +type ResultIteratorParams struct { + ResultCount int64 // number of results this iterator has processed + Result *Result // the current result + Done func() // Removes the current handler + Cancel func() // Cancels the iterator, same as iterator.Cancel() +} + +// ResultIteratorFn a result iterator handler +type ResultIteratorFn func(p ResultIteratorParams) + +// holds subscription handler data +type subscriptionHanlderConfig struct { + handler ResultIteratorFn + doneFunc func() +} +// ResultIterator handles processing results from a chan *Result type ResultIterator struct { - count int64 - wg sync.WaitGroup - ctx context.Context - ch chan *Result - cancelFunc context.CancelFunc - cancelled bool - handlers []ResultIteratorFn + currentHandlerID int64 + count int64 + wg sync.WaitGroup + ctx context.Context + ch chan *Result + cancelFunc context.CancelFunc + cancelled bool + handlers map[int64]*subscriptionHanlderConfig } +// NewResultIterator creates a new iterator and starts handling message on the result channel func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { if ctx == nil { ctx = context.Background() @@ -28,12 +46,13 @@ func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { cctx, cancelFunc := context.WithCancel(ctx) iterator := &ResultIterator{ - count: 0, - ctx: cctx, - ch: ch, - cancelFunc: cancelFunc, - cancelled: false, - handlers: []ResultIteratorFn{}, + currentHandlerID: 0, + count: 0, + ctx: cctx, + ch: ch, + cancelFunc: cancelFunc, + cancelled: false, + handlers: map[int64]*subscriptionHanlderConfig{}, } go func() { @@ -49,9 +68,14 @@ func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { iterator.wg.Add(1) iterator.count++ iterator.wg.Done() - for _, handler := range iterator.handlers { + for _, h := range iterator.handlers { iterator.wg.Wait() - handler(iterator.count, res, iterator.Done) + h.handler(ResultIteratorParams{ + ResultCount: iterator.count, + Result: res, + Done: h.doneFunc, + Cancel: iterator.Cancel, + }) } } } @@ -60,17 +84,42 @@ func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { return iterator } -func (c *ResultIterator) ForEach(handler ResultIteratorFn) { +// adds a new handler +func (c *ResultIterator) addHandler(handler ResultIteratorFn) { + c.wg.Add(1) + handlerID := c.currentHandlerID + 1 + c.currentHandlerID = handlerID + c.handlers[handlerID] = &subscriptionHanlderConfig{ + handler: handler, + doneFunc: func() { + c.removeHandler(handlerID) + }, + } + c.wg.Done() +} + +// removes a handler and cancels if no more handlers exist +func (c *ResultIterator) removeHandler(handlerID int64) { c.wg.Add(1) - c.handlers = append(c.handlers, handler) + delete(c.handlers, handlerID) + if len(c.handlers) == 0 { + c.Cancel() + } c.wg.Done() } -func (c *ResultIterator) Done() { +// ForEach adds a handler and handles each message as they come +func (c *ResultIterator) ForEach(handler ResultIteratorFn) { + c.addHandler(handler) +} + +// Cancel cancels the iterator +func (c *ResultIterator) Cancel() { c.cancelled = true c.cancelFunc() } +// SubscribeParams parameters for subscribing type SubscribeParams struct { Schema Schema Document *ast.Document diff --git a/subscription_test.go b/subscription_test.go index 67cf9358..f311fe0a 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -79,13 +79,13 @@ func TestSubscription(t *testing.T) { }) // test a subscribe that should fail due to no return value - failIterator.ForEach(func(count int64, res *Result, doneFunc func()) { - if !res.HasErrors() { + failIterator.ForEach(func(p ResultIteratorParams) { + if !p.Result.HasErrors() { t.Errorf("subscribe failed to catch nil result from subscribe") - doneFunc() + p.Done() return } - doneFunc() + p.Done() return }) @@ -95,27 +95,27 @@ func TestSubscription(t *testing.T) { ContextValue: context.Background(), }) - resultIterator.ForEach(func(count int64, res *Result, doneFunc func()) { - if res.HasErrors() { - t.Errorf("subscribe error(s): %v", res.Errors) - doneFunc() + resultIterator.ForEach(func(p ResultIteratorParams) { + if p.Result.HasErrors() { + t.Errorf("subscribe error(s): %v", p.Result.Errors) + p.Done() return } - if res.Data != nil { - data := res.Data.(map[string]interface{})["watch_count"] - expected := fmt.Sprintf("count=%d", count) + if p.Result.Data != nil { + data := p.Result.Data.(map[string]interface{})["watch_count"] + expected := fmt.Sprintf("count=%d", p.ResultCount) actual := fmt.Sprintf("%v", data) if actual != expected { t.Errorf("subscription result error: expected %q, actual %q", expected, actual) - doneFunc() + p.Done() return } // test the done func by quitting after 3 iterations // the publisher will publish up to 5 - if count >= int64(maxPublish-2) { - doneFunc() + if p.ResultCount >= int64(maxPublish-2) { + p.Done() return } } From 4a374e3590aaf5a36c698e7473fa48dbd98ffff7 Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Sun, 18 Aug 2019 11:40:05 -0700 Subject: [PATCH 05/25] ensure subscribe and resolve use the same cancellable context --- executor.go | 41 ++++++++++++++++++++++++++++++++++++++++- subscription.go | 25 ++++++++++--------------- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/executor.go b/executor.go index d8477140..e3976ebb 100644 --- a/executor.go +++ b/executor.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "reflect" + "sort" "strings" "github.com/graphql-go/graphql/gqlerrors" @@ -254,7 +255,9 @@ func executeFieldsSerially(p executeFieldsParams) *Result { } finalResults := make(map[string]interface{}, len(p.Fields)) - for responseName, fieldASTs := range p.Fields { + for _, orderedField := range orderedFields(p.Fields) { + responseName := orderedField.responseName + fieldASTs := orderedField.fieldASTs fieldPath := p.Path.WithKey(responseName) resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs, fieldPath) if state.hasNoFieldDefs { @@ -1038,3 +1041,39 @@ func getFieldDef(schema Schema, parentType *Object, fieldName string) *FieldDefi } return parentType.Fields()[fieldName] } + +// contains field information that will be placed in an ordered slice +type orderedField struct { + responseName string + fieldASTs []*ast.Field +} + +// orders fields from a fields map by location in the source +func orderedFields(fields map[string][]*ast.Field) []*orderedField { + orderedFields := []*orderedField{} + fieldMap := map[int]*orderedField{} + startLocs := []int{} + + for responseName, fieldASTs := range fields { + // find the lowest location in the current fieldASTs + lowest := -1 + for _, fieldAST := range fieldASTs { + loc := fieldAST.GetLoc().Start + if lowest == -1 || loc < lowest { + lowest = loc + } + } + startLocs = append(startLocs, lowest) + fieldMap[lowest] = &orderedField{ + responseName: responseName, + fieldASTs: fieldASTs, + } + } + + sort.Ints(startLocs) + for _, startLoc := range startLocs { + orderedFields = append(orderedFields, fieldMap[startLoc]) + } + + return orderedFields +} diff --git a/subscription.go b/subscription.go index 481559b9..f2eda2f8 100644 --- a/subscription.go +++ b/subscription.go @@ -39,18 +39,13 @@ type ResultIterator struct { } // NewResultIterator creates a new iterator and starts handling message on the result channel -func NewResultIterator(ctx context.Context, ch chan *Result) *ResultIterator { - if ctx == nil { - ctx = context.Background() - } - - cctx, cancelFunc := context.WithCancel(ctx) +func NewResultIterator(ctx context.Context, cancelFunc context.CancelFunc, ch chan *Result) *ResultIterator { iterator := &ResultIterator{ currentHandlerID: 0, count: 0, - ctx: cctx, - ch: ch, + ctx: ctx, cancelFunc: cancelFunc, + ch: ch, cancelled: false, handlers: map[int64]*subscriptionHanlderConfig{}, } @@ -140,6 +135,8 @@ func Subscribe(p SubscribeParams) *ResultIterator { ctx = context.Background() } + sctx, cancelFunc := context.WithCancel(ctx) + var mapSourceToResponse = func(payload interface{}) *Result { return Execute(ExecuteParams{ Schema: p.Schema, @@ -147,7 +144,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { AST: p.Document, OperationName: p.OperationName, Args: p.VariableValues, - Context: p.ContextValue, + Context: sctx, }) } @@ -168,7 +165,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { OperationName: p.OperationName, Args: p.VariableValues, Result: result, - Context: p.ContextValue, + Context: sctx, }) if err != nil { @@ -236,7 +233,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { Source: p.RootValue, Args: args, Info: info, - Context: exeContext.Context, + Context: sctx, }) if err != nil { result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) @@ -255,11 +252,9 @@ func Subscribe(p SubscribeParams) *ResultIterator { case chan interface{}: for { select { - case <-ctx.Done(): - fmt.Printf("done context called") + case <-sctx.Done(): return case res := <-fieldResult.(chan interface{}): - resultChannel <- mapSourceToResponse(res) } } @@ -270,5 +265,5 @@ func Subscribe(p SubscribeParams) *ResultIterator { }() // return a result iterator - return NewResultIterator(p.ContextValue, resultChannel) + return NewResultIterator(sctx, cancelFunc, resultChannel) } From ec949b38ccfa0c14251df5c22072b2c572759ea1 Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Wed, 11 Sep 2019 12:38:38 -0700 Subject: [PATCH 06/25] switching from mutex to waitgroups and adding a subscriber type with done channel --- subscription.go | 88 ++++++++++++++++++++++++++++++-------------- subscription_test.go | 3 +- 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/subscription.go b/subscription.go index f2eda2f8..7ef52fc2 100644 --- a/subscription.go +++ b/subscription.go @@ -9,6 +9,30 @@ import ( "github.com/graphql-go/graphql/language/ast" ) +// Subscriber subscriber +type Subscriber struct { + message chan interface{} + done chan interface{} +} + +// Message returns the subscriber message channel +func (c *Subscriber) Message() chan interface{} { + return c.message +} + +// Done returns the subscriber done channel +func (c *Subscriber) Done() chan interface{} { + return c.done +} + +// NewSubscriber creates a new subscriber +func NewSubscriber(message, done chan interface{}) *Subscriber { + return &Subscriber{ + message: message, + done: done, + } +} + // ResultIteratorParams parameters passed to the result iterator handler type ResultIteratorParams struct { ResultCount int64 // number of results this iterator has processed @@ -30,21 +54,28 @@ type subscriptionHanlderConfig struct { type ResultIterator struct { currentHandlerID int64 count int64 - wg sync.WaitGroup - ctx context.Context + mx sync.Mutex ch chan *Result - cancelFunc context.CancelFunc + iterDone chan interface{} + subDone chan interface{} cancelled bool handlers map[int64]*subscriptionHanlderConfig } +func (c *ResultIterator) incrimentCount() int64 { + c.mx.Lock() + defer c.mx.Unlock() + c.count++ + return c.count +} + // NewResultIterator creates a new iterator and starts handling message on the result channel -func NewResultIterator(ctx context.Context, cancelFunc context.CancelFunc, ch chan *Result) *ResultIterator { +func NewResultIterator(subDone chan interface{}, ch chan *Result) *ResultIterator { iterator := &ResultIterator{ currentHandlerID: 0, count: 0, - ctx: ctx, - cancelFunc: cancelFunc, + iterDone: make(chan interface{}), + subDone: subDone, ch: ch, cancelled: false, handlers: map[int64]*subscriptionHanlderConfig{}, @@ -53,20 +84,18 @@ func NewResultIterator(ctx context.Context, cancelFunc context.CancelFunc, ch ch go func() { for { select { - case <-iterator.ctx.Done(): + case <-iterator.iterDone: + subDone <- true return case res := <-iterator.ch: if iterator.cancelled { return } - iterator.wg.Wait() - iterator.wg.Add(1) - iterator.count++ - iterator.wg.Done() + + count := iterator.incrimentCount() for _, h := range iterator.handlers { - iterator.wg.Wait() h.handler(ResultIteratorParams{ - ResultCount: iterator.count, + ResultCount: int64(count), Result: res, Done: h.doneFunc, Cancel: iterator.Cancel, @@ -81,7 +110,9 @@ func NewResultIterator(ctx context.Context, cancelFunc context.CancelFunc, ch ch // adds a new handler func (c *ResultIterator) addHandler(handler ResultIteratorFn) { - c.wg.Add(1) + c.mx.Lock() + defer c.mx.Unlock() + handlerID := c.currentHandlerID + 1 c.currentHandlerID = handlerID c.handlers[handlerID] = &subscriptionHanlderConfig{ @@ -90,17 +121,17 @@ func (c *ResultIterator) addHandler(handler ResultIteratorFn) { c.removeHandler(handlerID) }, } - c.wg.Done() } // removes a handler and cancels if no more handlers exist func (c *ResultIterator) removeHandler(handlerID int64) { - c.wg.Add(1) + c.mx.Lock() + defer c.mx.Unlock() + delete(c.handlers, handlerID) if len(c.handlers) == 0 { c.Cancel() } - c.wg.Done() } // ForEach adds a handler and handles each message as they come @@ -111,7 +142,7 @@ func (c *ResultIterator) ForEach(handler ResultIteratorFn) { // Cancel cancels the iterator func (c *ResultIterator) Cancel() { c.cancelled = true - c.cancelFunc() + c.iterDone <- true } // SubscribeParams parameters for subscribing @@ -129,14 +160,13 @@ type SubscribeParams struct { // Subscribe performs a subscribe operation func Subscribe(p SubscribeParams) *ResultIterator { resultChannel := make(chan *Result) + doneChannel := make(chan interface{}) // Use background context if no context was provided ctx := p.ContextValue if ctx == nil { ctx = context.Background() } - sctx, cancelFunc := context.WithCancel(ctx) - var mapSourceToResponse = func(payload interface{}) *Result { return Execute(ExecuteParams{ Schema: p.Schema, @@ -144,15 +174,15 @@ func Subscribe(p SubscribeParams) *ResultIterator { AST: p.Document, OperationName: p.OperationName, Args: p.VariableValues, - Context: sctx, + Context: ctx, }) } go func() { - result := &Result{} defer func() { if err := recover(); err != nil { + fmt.Println("SUBSCRIPTION RECOVERER", err) result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) } resultChannel <- result @@ -165,7 +195,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { OperationName: p.OperationName, Args: p.VariableValues, Result: result, - Context: sctx, + Context: ctx, }) if err != nil { @@ -233,7 +263,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { Source: p.RootValue, Args: args, Info: info, - Context: sctx, + Context: ctx, }) if err != nil { result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) @@ -249,12 +279,14 @@ func Subscribe(p SubscribeParams) *ResultIterator { } switch fieldResult.(type) { - case chan interface{}: + case *Subscriber: + sub := fieldResult.(*Subscriber) for { select { - case <-sctx.Done(): + case <-doneChannel: + sub.done <- true return - case res := <-fieldResult.(chan interface{}): + case res := <-sub.message: resultChannel <- mapSourceToResponse(res) } } @@ -265,5 +297,5 @@ func Subscribe(p SubscribeParams) *ResultIterator { }() // return a result iterator - return NewResultIterator(sctx, cancelFunc, resultChannel) + return NewResultIterator(doneChannel, resultChannel) } diff --git a/subscription_test.go b/subscription_test.go index f311fe0a..4821318c 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -52,7 +52,8 @@ func TestSubscription(t *testing.T) { return fmt.Sprintf("count=%v", p.Source), nil }, Subscribe: func(p ResolveParams) (interface{}, error) { - return m, nil + sub := NewSubscriber(m, make(chan interface{})) + return sub, nil }, }, "watch_should_fail": &Field{ From f21d0c7235c1fa143c611400e272668dabff2ff6 Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Sun, 17 May 2020 15:50:25 -0700 Subject: [PATCH 07/25] removing ResultIterator in favor of a result channel --- go.mod | 2 + subscription.go | 169 ++++--------------------------------------- subscription_test.go | 73 +++++++++++-------- 3 files changed, 56 insertions(+), 188 deletions(-) diff --git a/go.mod b/go.mod index 399b200d..7e02f765 100644 --- a/go.mod +++ b/go.mod @@ -1 +1,3 @@ module github.com/graphql-go/graphql + +go 1.13 diff --git a/subscription.go b/subscription.go index 7ef52fc2..720babec 100644 --- a/subscription.go +++ b/subscription.go @@ -3,148 +3,11 @@ package graphql import ( "context" "fmt" - "sync" "github.com/graphql-go/graphql/gqlerrors" "github.com/graphql-go/graphql/language/ast" ) -// Subscriber subscriber -type Subscriber struct { - message chan interface{} - done chan interface{} -} - -// Message returns the subscriber message channel -func (c *Subscriber) Message() chan interface{} { - return c.message -} - -// Done returns the subscriber done channel -func (c *Subscriber) Done() chan interface{} { - return c.done -} - -// NewSubscriber creates a new subscriber -func NewSubscriber(message, done chan interface{}) *Subscriber { - return &Subscriber{ - message: message, - done: done, - } -} - -// ResultIteratorParams parameters passed to the result iterator handler -type ResultIteratorParams struct { - ResultCount int64 // number of results this iterator has processed - Result *Result // the current result - Done func() // Removes the current handler - Cancel func() // Cancels the iterator, same as iterator.Cancel() -} - -// ResultIteratorFn a result iterator handler -type ResultIteratorFn func(p ResultIteratorParams) - -// holds subscription handler data -type subscriptionHanlderConfig struct { - handler ResultIteratorFn - doneFunc func() -} - -// ResultIterator handles processing results from a chan *Result -type ResultIterator struct { - currentHandlerID int64 - count int64 - mx sync.Mutex - ch chan *Result - iterDone chan interface{} - subDone chan interface{} - cancelled bool - handlers map[int64]*subscriptionHanlderConfig -} - -func (c *ResultIterator) incrimentCount() int64 { - c.mx.Lock() - defer c.mx.Unlock() - c.count++ - return c.count -} - -// NewResultIterator creates a new iterator and starts handling message on the result channel -func NewResultIterator(subDone chan interface{}, ch chan *Result) *ResultIterator { - iterator := &ResultIterator{ - currentHandlerID: 0, - count: 0, - iterDone: make(chan interface{}), - subDone: subDone, - ch: ch, - cancelled: false, - handlers: map[int64]*subscriptionHanlderConfig{}, - } - - go func() { - for { - select { - case <-iterator.iterDone: - subDone <- true - return - case res := <-iterator.ch: - if iterator.cancelled { - return - } - - count := iterator.incrimentCount() - for _, h := range iterator.handlers { - h.handler(ResultIteratorParams{ - ResultCount: int64(count), - Result: res, - Done: h.doneFunc, - Cancel: iterator.Cancel, - }) - } - } - } - }() - - return iterator -} - -// adds a new handler -func (c *ResultIterator) addHandler(handler ResultIteratorFn) { - c.mx.Lock() - defer c.mx.Unlock() - - handlerID := c.currentHandlerID + 1 - c.currentHandlerID = handlerID - c.handlers[handlerID] = &subscriptionHanlderConfig{ - handler: handler, - doneFunc: func() { - c.removeHandler(handlerID) - }, - } -} - -// removes a handler and cancels if no more handlers exist -func (c *ResultIterator) removeHandler(handlerID int64) { - c.mx.Lock() - defer c.mx.Unlock() - - delete(c.handlers, handlerID) - if len(c.handlers) == 0 { - c.Cancel() - } -} - -// ForEach adds a handler and handles each message as they come -func (c *ResultIterator) ForEach(handler ResultIteratorFn) { - c.addHandler(handler) -} - -// Cancel cancels the iterator -func (c *ResultIterator) Cancel() { - c.cancelled = true - c.iterDone <- true -} - // SubscribeParams parameters for subscribing type SubscribeParams struct { Schema Schema @@ -158,14 +21,8 @@ type SubscribeParams struct { } // Subscribe performs a subscribe operation -func Subscribe(p SubscribeParams) *ResultIterator { +func Subscribe(ctx context.Context, p SubscribeParams) chan *Result { resultChannel := make(chan *Result) - doneChannel := make(chan interface{}) - // Use background context if no context was provided - ctx := p.ContextValue - if ctx == nil { - ctx = context.Background() - } var mapSourceToResponse = func(payload interface{}) *Result { return Execute(ExecuteParams{ @@ -174,7 +31,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { AST: p.Document, OperationName: p.OperationName, Args: p.VariableValues, - Context: ctx, + Context: p.ContextValue, }) } @@ -182,10 +39,10 @@ func Subscribe(p SubscribeParams) *ResultIterator { result := &Result{} defer func() { if err := recover(); err != nil { - fmt.Println("SUBSCRIPTION RECOVERER", err) result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) + resultChannel <- result } - resultChannel <- result + close(resultChannel) }() exeContext, err := buildExecutionContext(buildExecutionCtxParams{ @@ -195,7 +52,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { OperationName: p.OperationName, Args: p.VariableValues, Result: result, - Context: ctx, + Context: p.ContextValue, }) if err != nil { @@ -263,7 +120,7 @@ func Subscribe(p SubscribeParams) *ResultIterator { Source: p.RootValue, Args: args, Info: info, - Context: ctx, + Context: p.ContextValue, }) if err != nil { result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) @@ -279,14 +136,14 @@ func Subscribe(p SubscribeParams) *ResultIterator { } switch fieldResult.(type) { - case *Subscriber: - sub := fieldResult.(*Subscriber) + case chan interface{}: + sub := fieldResult.(chan interface{}) for { select { - case <-doneChannel: - sub.done <- true + case <-ctx.Done(): return - case res := <-sub.message: + + case res := <-sub: resultChannel <- mapSourceToResponse(res) } } @@ -296,6 +153,6 @@ func Subscribe(p SubscribeParams) *ResultIterator { } }() - // return a result iterator - return NewResultIterator(doneChannel, resultChannel) + // return a result channel + return resultChannel } diff --git a/subscription_test.go b/subscription_test.go index 4821318c..ef70284c 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -52,8 +52,7 @@ func TestSubscription(t *testing.T) { return fmt.Sprintf("count=%v", p.Source), nil }, Subscribe: func(p ResolveParams) (interface{}, error) { - sub := NewSubscriber(m, make(chan interface{})) - return sub, nil + return m, nil }, }, "watch_should_fail": &Field{ @@ -74,53 +73,62 @@ func TestSubscription(t *testing.T) { return } - failIterator := Subscribe(SubscribeParams{ + // test a subscribe that should fail due to no return value + fctx, fCancelFunc := context.WithCancel(context.Background()) + fail := Subscribe(fctx, SubscribeParams{ Schema: schema, Document: document2, }) - // test a subscribe that should fail due to no return value - failIterator.ForEach(func(p ResultIteratorParams) { - if !p.Result.HasErrors() { - t.Errorf("subscribe failed to catch nil result from subscribe") - p.Done() + go func() { + for { + result := <-fail + if !result.HasErrors() { + t.Errorf("subscribe failed to catch nil result from subscribe") + } + fCancelFunc() return } - p.Done() - return - }) + }() - resultIterator := Subscribe(SubscribeParams{ + // test subscription data + resultCount := 0 + rctx, rCancelFunc := context.WithCancel(context.Background()) + results := Subscribe(rctx, SubscribeParams{ Schema: schema, Document: document1, ContextValue: context.Background(), }) - resultIterator.ForEach(func(p ResultIteratorParams) { - if p.Result.HasErrors() { - t.Errorf("subscribe error(s): %v", p.Result.Errors) - p.Done() - return - } - - if p.Result.Data != nil { - data := p.Result.Data.(map[string]interface{})["watch_count"] - expected := fmt.Sprintf("count=%d", p.ResultCount) - actual := fmt.Sprintf("%v", data) - if actual != expected { - t.Errorf("subscription result error: expected %q, actual %q", expected, actual) - p.Done() + go func() { + for { + result := <-results + if result.HasErrors() { + t.Errorf("subscribe error(s): %v", result.Errors) + rCancelFunc() return } - // test the done func by quitting after 3 iterations - // the publisher will publish up to 5 - if p.ResultCount >= int64(maxPublish-2) { - p.Done() - return + if result.Data != nil { + resultCount++ + data := result.Data.(map[string]interface{})["watch_count"] + expected := fmt.Sprintf("count=%d", resultCount) + actual := fmt.Sprintf("%v", data) + if actual != expected { + t.Errorf("subscription result error: expected %q, actual %q", expected, actual) + rCancelFunc() + return + } + + // test the done func by quitting after 3 iterations + // the publisher will publish up to 5 + if resultCount >= maxPublish-2 { + rCancelFunc() + return + } } } - }) + }() // start publishing go func() { @@ -128,6 +136,7 @@ func TestSubscription(t *testing.T) { time.Sleep(200 * time.Millisecond) m <- i } + close(m) }() // give time for the test to complete From c991585ae12bd48cdb1a5ff7a06cab30b7fb075e Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 18:16:16 +0200 Subject: [PATCH 08/25] rewritten function signature and more tests --- subscription.go | 167 ++++++++++++---- subscription_test.go | 404 +++++++++++++++++++++++++++------------ testutil/subscription.go | 141 ++++++++++++++ 3 files changed, 552 insertions(+), 160 deletions(-) create mode 100644 testutil/subscription.go diff --git a/subscription.go b/subscription.go index 720babec..5238d721 100644 --- a/subscription.go +++ b/subscription.go @@ -5,66 +5,143 @@ import ( "fmt" "github.com/graphql-go/graphql/gqlerrors" - "github.com/graphql-go/graphql/language/ast" + "github.com/graphql-go/graphql/language/parser" + "github.com/graphql-go/graphql/language/source" ) // SubscribeParams parameters for subscribing type SubscribeParams struct { - Schema Schema - Document *ast.Document - RootValue interface{} - ContextValue context.Context + Schema Schema + RequestString string + RootValue interface{} + // ContextValue context.Context VariableValues map[string]interface{} OperationName string FieldResolver FieldResolveFn FieldSubscriber FieldResolveFn } +// SubscriptableSchema implements `graphql-transport-ws` `GraphQLService` interface: https://github.com/graph-gophers/graphql-transport-ws/blob/40c0484322990a129cac2f2d2763c3315230280c/graphqlws/internal/connection/connection.go#L53 +type SubscriptableSchema struct { + Schema Schema + RootObject map[string]interface{} +} + +func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan *Result, error) { + c := Subscribe(Params{ + Schema: self.Schema, + Context: ctx, + OperationName: operationName, + RequestString: queryString, + RootObject: self.RootObject, + VariableValues: variables, + }) + return c, nil +} + // Subscribe performs a subscribe operation -func Subscribe(ctx context.Context, p SubscribeParams) chan *Result { +func Subscribe(p Params) chan *Result { + + source := source.NewSource(&source.Source{ + Body: []byte(p.RequestString), + Name: "GraphQL request", + }) + + // TODO run extensions hooks + + // parse the source + AST, err := parser.Parse(parser.ParseParams{Source: source}) + if err != nil { + + // merge the errors from extensions and the original error from parser + return sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(err), + }) + } + + // validate document + validationResult := ValidateDocument(&p.Schema, AST, nil) + + if !validationResult.IsValid { + // run validation finish functions for extensions + return sendOneResultandClose(&Result{ + Errors: validationResult.Errors, + }) + + } + return ExecuteSubscription(ExecuteParams{ + Schema: p.Schema, + Root: p.RootObject, + AST: AST, + OperationName: p.OperationName, + Args: p.VariableValues, + Context: p.Context, + }) +} + +func sendOneResultandClose(res *Result) chan *Result { resultChannel := make(chan *Result) + resultChannel <- res + close(resultChannel) + return resultChannel +} + +func ExecuteSubscription(p ExecuteParams) chan *Result { + + if p.Context == nil { + p.Context = context.Background() + } + + // TODO run executionDidStart functions from extensions var mapSourceToResponse = func(payload interface{}) *Result { return Execute(ExecuteParams{ Schema: p.Schema, Root: payload, - AST: p.Document, + AST: p.AST, OperationName: p.OperationName, - Args: p.VariableValues, - Context: p.ContextValue, + Args: p.Args, + Context: p.Context, }) } - + var resultChannel = make(chan *Result) go func() { - result := &Result{} defer func() { if err := recover(); err != nil { - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + e, ok := err.(error) + if !ok { + return + } + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(e), + }) } - close(resultChannel) + // close(resultChannel) + return }() exeContext, err := buildExecutionContext(buildExecutionCtxParams{ Schema: p.Schema, - Root: p.RootValue, - AST: p.Document, + Root: p.Root, + AST: p.AST, OperationName: p.OperationName, - Args: p.VariableValues, - Result: result, - Context: p.ContextValue, + Args: p.Args, + Result: &Result{}, // TODO what is this? + Context: p.Context, }) if err != nil { - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(err), + }) return } operationType, err := getOperationRootType(p.Schema, exeContext.Operation) if err != nil { - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(err), + }) return } @@ -85,18 +162,19 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result { fieldDef := getFieldDef(p.Schema, operationType, fieldName) if fieldDef == nil { - err := fmt.Errorf("the subscription field %q is not defined", fieldName) - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)), + }) return } - resolveFn := p.FieldSubscriber + resolveFn := fieldDef.Subscribe + if resolveFn == nil { - resolveFn = DefaultResolveFn - } - if fieldDef.Subscribe != nil { - resolveFn = fieldDef.Subscribe + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)), + }) + return } fieldPath := &ResponsePath{ Key: responseName, @@ -117,38 +195,47 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result { } fieldResult, err := resolveFn(ResolveParams{ - Source: p.RootValue, + Source: p.Root, Args: args, Info: info, - Context: p.ContextValue, + Context: p.Context, }) if err != nil { - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(err), + }) return } if fieldResult == nil { - err := fmt.Errorf("no field result") - result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error))) - resultChannel <- result + sendOneResultandClose(&Result{ + Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")), + }) return } switch fieldResult.(type) { case chan interface{}: sub := fieldResult.(chan interface{}) + defer close(resultChannel) for { select { - case <-ctx.Done(): + case <-p.Context.Done(): + println("context cancelled") + // TODO send the context error to the resultchannel return - case res := <-sub: + case res, more := <-sub: + if !more { + return + } resultChannel <- mapSourceToResponse(res) } } default: + fmt.Println(fieldResult) resultChannel <- mapSourceToResponse(fieldResult) + close(resultChannel) return } }() diff --git a/subscription_test.go b/subscription_test.go index ef70284c..095c08a2 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -1,144 +1,308 @@ -package graphql +package graphql_test import ( - "context" "fmt" "testing" - "time" - "github.com/graphql-go/graphql/language/parser" - "github.com/graphql-go/graphql/language/source" + "github.com/graphql-go/graphql" + "github.com/graphql-go/graphql/testutil" ) -func TestSubscription(t *testing.T) { - var maxPublish = 5 - m := make(chan interface{}) +func makeSubscribeToStringFunction(elements []string) func(p graphql.ResolveParams) (interface{}, error) { + return func(p graphql.ResolveParams) (interface{}, error) { + c := make(chan interface{}) + go func() { + for _, r := range elements { + select { + case <-p.Context.Done(): + close(c) + return + case c <- r: + } + } + close(c) + }() + return c, nil + } +} - source1 := source.NewSource(&source.Source{ - Body: []byte(`subscription { - watch_count - }`), - Name: "GraphQL request", - }) +func makeSubscribeToMapFunction(elements []map[string]interface{}) func(p graphql.ResolveParams) (interface{}, error) { + return func(p graphql.ResolveParams) (interface{}, error) { + c := make(chan interface{}) + go func() { + for _, r := range elements { + select { + case <-p.Context.Done(): + close(c) + return + case c <- r: + } + } + close(c) + }() + return c, nil + } +} - source2 := source.NewSource(&source.Source{ - Body: []byte(`subscription { - watch_should_fail - }`), - Name: "GraphQL request", +func makeSubscriptionSchema(t *testing.T, c graphql.ObjectConfig) graphql.Schema { + schema, err := graphql.NewSchema(graphql.SchemaConfig{ + Query: dummyQuery, + Subscription: graphql.NewObject(c), }) + if err != nil { + t.Errorf("failed to create schema: %v", err) + } + return schema +} - document1, _ := parser.Parse(parser.ParseParams{Source: source1}) - document2, _ := parser.Parse(parser.ParseParams{Source: source2}) +func TestSchemaSubscribe(t *testing.T) { - schema, err := NewSchema(SchemaConfig{ - Query: NewObject(ObjectConfig{ - Name: "Query", - Fields: Fields{ - "hello": &Field{ - Type: String, - Resolve: func(p ResolveParams) (interface{}, error) { - return "world", nil + testutil.RunSubscribes(t, []*testutil.TestSubscription{ + { + Name: "subscribe without resolver", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "sub_without_resolver": &graphql.Field{ + Type: graphql.String, + Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source, nil + }, }, }, + }), + Query: ` + subscription onHelloSaid { + sub_without_resolver + } + `, + ExpectedResults: []testutil.TestResponse{ + {Data: `{ "sub_without_resolver": "a" }`}, + {Data: `{ "sub_without_resolver": "b" }`}, + {Data: `{ "sub_without_resolver": "c" }`}, }, - }), - Subscription: NewObject(ObjectConfig{ - Name: "Subscription", - Fields: Fields{ - "watch_count": &Field{ - Type: String, - Resolve: func(p ResolveParams) (interface{}, error) { - return fmt.Sprintf("count=%v", p.Source), nil - }, - Subscribe: func(p ResolveParams) (interface{}, error) { - return m, nil - }, - }, - "watch_should_fail": &Field{ - Type: String, - Resolve: func(p ResolveParams) (interface{}, error) { - return fmt.Sprintf("count=%v", p.Source), nil - }, - Subscribe: func(p ResolveParams) (interface{}, error) { - return nil, nil + }, + { + Name: "subscribe with resolver changes output", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "sub_with_resolver": &graphql.Field{ + Type: graphql.String, + Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return fmt.Sprintf("result=%v", p.Source), nil + }, }, }, + }), + Query: ` + subscription onHelloSaid { + sub_with_resolver + } + `, + ExpectedResults: []testutil.TestResponse{ + {Data: `{ "sub_with_resolver": "result=a" }`}, + {Data: `{ "sub_with_resolver": "result=b" }`}, + {Data: `{ "sub_with_resolver": "result=c" }`}, }, - }), - }) - - if err != nil { - t.Errorf("failed to create schema: %v", err) - return - } - - // test a subscribe that should fail due to no return value - fctx, fCancelFunc := context.WithCancel(context.Background()) - fail := Subscribe(fctx, SubscribeParams{ - Schema: schema, - Document: document2, - }) - - go func() { - for { - result := <-fail - if !result.HasErrors() { - t.Errorf("subscribe failed to catch nil result from subscribe") - } - fCancelFunc() - return - } - }() + }, + // { + // Name: "subscribe to a nested object", + // Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + // Name: "Subscription", + // Fields: graphql.Fields{ + // "sub_with_object": &graphql.Field{ + // Type: graphql.String, + // Subscribe: makeSubscribeToMapFunction([]map[string]interface{}{ + // { + // "field": "hello", + // "obj": map[string]interface{}{ + // "field": "hello", + // }, + // }, + // { + // "field": "bye", + // "obj": map[string]interface{}{ + // "field": "bye", + // }, + // }, + // }), + // }, + // }, + // }), + // Query: ` + // subscription onHelloSaid { + // sub_with_object { + // field + // obj { + // field + // } + // } + // } + // `, + // ExpectedResults: []testutil.TestResponse{ + // {Data: `{ "sub_with_object": { "field": "hello", "obj": { "field": "hello" } } }`}, + // }, + // }, - // test subscription data - resultCount := 0 - rctx, rCancelFunc := context.WithCancel(context.Background()) - results := Subscribe(rctx, SubscribeParams{ - Schema: schema, - Document: document1, - ContextValue: context.Background(), + // { + // Name: "parse_errors", + // Schema: schema, + // Query: `invalid graphQL query`, + // ExpectedResults: []testutil.TestResponse{ + // { + // Errors: []gqlerrors.FormattedError{{Message: ""}}, + // }, + // }, + // }, + // { + // Name: "subscribe_to_query_succeeds", + // Schema: schema, + // Query: ` + // query Hello { + // hello + // } + // `, + // ExpectedResults: []testutil.TestResponse{ + // { + // Data: json.RawMessage(` + // { + // "hello": "Hello world!" + // } + // `), + // }, + // }, + // }, + // { + // Name: "subscription_resolver_can_error", + // Schema: schema, + // Query: ` + // subscription onHelloSaid { + // helloSaid { + // msg + // } + // } + // `, + // ExpectedResults: []testutil.TestResponse{ + // { + // Data: json.RawMessage(` + // null + // `), + // Errors: []gqlerrors.FormattedError{{Message: ""}}}, + // }, + // }, + // { + // Name: "subscription_resolver_can_error_optional_msg", + // Schema: schema, + // Query: ` + // subscription onHelloSaid { + // helloSaidNullable { + // msg + // } + // } + // `, + // ExpectedResults: []testutil.TestResponse{ + // { + // Data: json.RawMessage(` + // { + // "helloSaidNullable": { + // "msg": null + // } + // } + // `), + // Errors: []gqlerrors.FormattedError{{Message: ""}}}, + // }, + // }, + // { + // Name: "subscription_resolver_can_error_optional_event", + // Schema: schema, + // Query: ` + // subscription onHelloSaid { + // helloSaidNullable { + // msg + // } + // } + // `, + // ExpectedResults: []testutil.TestResponse{ + // { + // Data: json.RawMessage(` + // { + // "helloSaidNullable": null + // } + // `), + // Errors: []gqlerrors.FormattedError{{Message: ""}}}, + // }, + // }, + // { + // Name: "schema_without_resolver_errors", + // Schema: schema, + // Query: ` + // subscription onHelloSaid { + // helloSaid { + // msg + // } + // } + // `, + // ExpectedErr: errors.New("schema created without resolver, can not subscribe"), + // }, }) +} - go func() { - for { - result := <-results - if result.HasErrors() { - t.Errorf("subscribe error(s): %v", result.Errors) - rCancelFunc() - return - } +// func TestRootOperations_invalidSubscriptionSchema(t *testing.T) { +// type args struct { +// Schema string +// } +// type want struct { +// Error string +// } +// testTable := map[string]struct { +// Args args +// Want want +// }{ +// "Subscription as incorrect type": { +// Args: args{ +// Schema: ` +// schema { +// query: Query +// subscription: String +// } +// type Query { +// thing: String +// } +// `, +// }, +// Want: want{Error: `root operation "subscription" must be an OBJECT`}, +// }, +// "Subscription declared by schema, but type not present": { +// Args: args{ +// Schema: ` +// schema { +// query: Query +// subscription: Subscription +// } +// type Query { +// hello: String! +// } +// `, +// }, +// Want: want{Error: `graphql: type "Subscription" not found`}, +// }, +// } - if result.Data != nil { - resultCount++ - data := result.Data.(map[string]interface{})["watch_count"] - expected := fmt.Sprintf("count=%d", resultCount) - actual := fmt.Sprintf("%v", data) - if actual != expected { - t.Errorf("subscription result error: expected %q, actual %q", expected, actual) - rCancelFunc() - return - } +// for name, tt := range testTable { +// tt := tt +// t.Run(name, func(t *testing.T) { +// t.Log(tt.Args.Schema) // TODO do something +// }) +// } +// } - // test the done func by quitting after 3 iterations - // the publisher will publish up to 5 - if resultCount >= maxPublish-2 { - rCancelFunc() - return - } - } - } - }() +var dummyQuery = graphql.NewObject(graphql.ObjectConfig{ + Name: "Query", + Fields: graphql.Fields{ - // start publishing - go func() { - for i := 1; i <= maxPublish; i++ { - time.Sleep(200 * time.Millisecond) - m <- i - } - close(m) - }() - - // give time for the test to complete - time.Sleep(1 * time.Second) -} + "hello": &graphql.Field{Type: graphql.String}, + }, +}) diff --git a/testutil/subscription.go b/testutil/subscription.go new file mode 100644 index 00000000..b49266c5 --- /dev/null +++ b/testutil/subscription.go @@ -0,0 +1,141 @@ +package testutil + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "testing" + + "github.com/graphql-go/graphql" + "github.com/graphql-go/graphql/gqlerrors" +) + +// TestResponse models the expected response +type TestResponse struct { + Data string + Errors []gqlerrors.FormattedError +} + +// TestSubscription is a GraphQL test case to be used with RunSubscribe. +type TestSubscription struct { + Name string + Schema graphql.Schema + Query string + OperationName string + Variables map[string]interface{} + ExpectedResults []TestResponse + ExpectedErr error +} + +// RunSubscribes runs the given GraphQL subscription test cases as subtests. +func RunSubscribes(t *testing.T, tests []*TestSubscription) { + for i, test := range tests { + if test.Name == "" { + test.Name = strconv.Itoa(i + 1) + } + + t.Run(test.Name, func(t *testing.T) { + RunSubscribe(t, test) + }) + } +} + +// RunSubscribe runs a single GraphQL subscription test case. +func RunSubscribe(t *testing.T, test *TestSubscription) { + ctx, _ := context.WithCancel(context.Background()) + // defer cancel() // TODO add defer cancel + + c := graphql.Subscribe(graphql.Params{ + Context: ctx, + OperationName: test.OperationName, + RequestString: test.Query, + VariableValues: test.Variables, + Schema: test.Schema, + }) + // if err != nil { + // if err.Error() != test.ExpectedErr.Error() { + // t.Fatalf("unexpected error: got %+v, want %+v", err, test.ExpectedErr) + // } + + // return + // } + + var results []*graphql.Result + for res := range c { + fmt.Println(res) + results = append(results, res) + } + + for i, expected := range test.ExpectedResults { + if len(results)-1 < i { + t.Error(errors.New("not enough results, expected results are more than actual results")) + return + } + res := results[i] + + checkErrorStrings(t, expected.Errors, res.Errors) + + resData, err := json.MarshalIndent(res.Data, "", " ") + if err != nil { + t.Fatal(err) + } + got, err := json.MarshalIndent(res.Data, "", " ") + if err != nil { + t.Fatalf("got: invalid JSON: %s; raw: %s", err, resData) + } + + if err != nil { + t.Fatal(err) + } + want, err := formatJSON(expected.Data) + if err != nil { + t.Fatalf("got: invalid JSON: %s; raw: %s", err, res.Data) + } + + if !bytes.Equal(got, want) { + t.Logf("got: %s", got) + t.Logf("want: %s", want) + t.Fail() + } + } +} + +func checkErrorStrings(t *testing.T, expected, actual []gqlerrors.FormattedError) { + expectedCount, actualCount := len(expected), len(actual) + + if expectedCount != actualCount { + t.Fatalf("unexpected number of errors: want %d, got %d", expectedCount, actualCount) + } + + if expectedCount > 0 { + for i, want := range expected { + got := actual[i] + + if got.Error() != want.Error() { + t.Fatalf("unexpected error: got %+v, want %+v", got, want) + } + } + + // Return because we're done checking. + return + } + + for _, err := range actual { + t.Errorf("unexpected error: '%s'", err) + } +} + +func formatJSON(data string) ([]byte, error) { + var v interface{} + if err := json.Unmarshal([]byte(data), &v); err != nil { + return nil, err + } + formatted, err := json.MarshalIndent(v, "", " ") + if err != nil { + return nil, err + } + return formatted, nil +} From ccd052d34137f80c481a4e3676d3be361168a17d Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 18:34:32 +0200 Subject: [PATCH 09/25] initial tests pass --- subscription.go | 4 +- subscription_test.go | 80 ++++++++++++++++++++-------------------- testutil/subscription.go | 11 +++++- 3 files changed, 53 insertions(+), 42 deletions(-) diff --git a/subscription.go b/subscription.go index 5238d721..12ccfc28 100644 --- a/subscription.go +++ b/subscription.go @@ -110,6 +110,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { if err := recover(); err != nil { e, ok := err.(error) if !ok { + fmt.Println("strange program path") return } sendOneResultandClose(&Result{ @@ -217,16 +218,17 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { switch fieldResult.(type) { case chan interface{}: sub := fieldResult.(chan interface{}) - defer close(resultChannel) for { select { case <-p.Context.Done(): println("context cancelled") + close(resultChannel) // TODO send the context error to the resultchannel return case res, more := <-sub: if !more { + close(resultChannel) return } resultChannel <- mapSourceToResponse(res) diff --git a/subscription_test.go b/subscription_test.go index 095c08a2..7b899347 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -66,7 +66,7 @@ func TestSchemaSubscribe(t *testing.T) { "sub_without_resolver": &graphql.Field{ Type: graphql.String, Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), - Resolve: func(p graphql.ResolveParams) (interface{}, error) { + Resolve: func(p graphql.ResolveParams) (interface{}, error) { // TODO remove dummy resolver return p.Source, nil }, }, @@ -108,44 +108,46 @@ func TestSchemaSubscribe(t *testing.T) { {Data: `{ "sub_with_resolver": "result=c" }`}, }, }, - // { - // Name: "subscribe to a nested object", - // Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ - // Name: "Subscription", - // Fields: graphql.Fields{ - // "sub_with_object": &graphql.Field{ - // Type: graphql.String, - // Subscribe: makeSubscribeToMapFunction([]map[string]interface{}{ - // { - // "field": "hello", - // "obj": map[string]interface{}{ - // "field": "hello", - // }, - // }, - // { - // "field": "bye", - // "obj": map[string]interface{}{ - // "field": "bye", - // }, - // }, - // }), - // }, - // }, - // }), - // Query: ` - // subscription onHelloSaid { - // sub_with_object { - // field - // obj { - // field - // } - // } - // } - // `, - // ExpectedResults: []testutil.TestResponse{ - // {Data: `{ "sub_with_object": { "field": "hello", "obj": { "field": "hello" } } }`}, - // }, - // }, + { + Name: "subscribe to a nested object", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "sub_with_object": &graphql.Field{ + Type: graphql.NewObject(graphql.ObjectConfig{ + Name: "Obj", + Fields: graphql.Fields{ + "field": &graphql.Field{ + Type: graphql.String, + }, + }, + }), + Resolve: func(p graphql.ResolveParams) (interface{}, error) { // TODO remove dummy resolver + return p.Source, nil + }, + Subscribe: makeSubscribeToMapFunction([]map[string]interface{}{ + { + "field": "hello", + }, + { + "field": "bye", + }, + }), + }, + }, + }), + Query: ` + subscription onHelloSaid { + sub_with_object { + field + } + } + `, + ExpectedResults: []testutil.TestResponse{ + {Data: `{ "sub_with_object": { "field": "hello" } }`}, + {Data: `{ "sub_with_object": { "field": "bye" } }`}, + }, + }, // { // Name: "parse_errors", diff --git a/testutil/subscription.go b/testutil/subscription.go index b49266c5..2ac5faba 100644 --- a/testutil/subscription.go +++ b/testutil/subscription.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "strconv" "testing" @@ -65,7 +64,7 @@ func RunSubscribe(t *testing.T, test *TestSubscription) { var results []*graphql.Result for res := range c { - fmt.Println(res) + println(pretty(res)) results = append(results, res) } @@ -139,3 +138,11 @@ func formatJSON(data string) ([]byte, error) { } return formatted, nil } + +func pretty(x interface{}) string { + got, err := json.MarshalIndent(x, "", " ") + if err != nil { + panic(err) + } + return string(got) +} From 7e815e2ed26296474e96dcb5be583e8c8e59faed Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 18:46:03 +0200 Subject: [PATCH 10/25] added test for subscriptions parse errors --- subscription.go | 22 +++++++++++----------- subscription_test.go | 24 +++++++++++++++++++++++- testutil/subscription.go | 27 ++++++++++++++------------- 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/subscription.go b/subscription.go index 12ccfc28..cf2644ab 100644 --- a/subscription.go +++ b/subscription.go @@ -54,7 +54,7 @@ func Subscribe(p Params) chan *Result { if err != nil { // merge the errors from extensions and the original error from parser - return sendOneResultandClose(&Result{ + return sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(err), }) } @@ -64,7 +64,7 @@ func Subscribe(p Params) chan *Result { if !validationResult.IsValid { // run validation finish functions for extensions - return sendOneResultandClose(&Result{ + return sendOneResultAndClose(&Result{ Errors: validationResult.Errors, }) @@ -79,8 +79,8 @@ func Subscribe(p Params) chan *Result { }) } -func sendOneResultandClose(res *Result) chan *Result { - resultChannel := make(chan *Result) +func sendOneResultAndClose(res *Result) chan *Result { + resultChannel := make(chan *Result, 1) // TODO unbuffered channel does not pass errors, why? resultChannel <- res close(resultChannel) return resultChannel @@ -113,7 +113,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { fmt.Println("strange program path") return } - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(e), }) } @@ -132,7 +132,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { }) if err != nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(err), }) return @@ -140,7 +140,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { operationType, err := getOperationRootType(p.Schema, exeContext.Operation) if err != nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(err), }) return @@ -163,7 +163,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { fieldDef := getFieldDef(p.Schema, operationType, fieldName) if fieldDef == nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)), }) return @@ -172,7 +172,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { resolveFn := fieldDef.Subscribe if resolveFn == nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)), }) return @@ -202,14 +202,14 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { Context: p.Context, }) if err != nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(err), }) return } if fieldResult == nil { - sendOneResultandClose(&Result{ + sendOneResultAndClose(&Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")), }) return diff --git a/subscription_test.go b/subscription_test.go index 7b899347..a4e2b99e 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -83,6 +83,27 @@ func TestSchemaSubscribe(t *testing.T) { {Data: `{ "sub_without_resolver": "c" }`}, }, }, + { + Name: "receive parse error", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "sub_without_resolver": &graphql.Field{ + Type: graphql.String, + Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), + }, + }, + }), + Query: ` + subscription onHelloSaid { + sub_without_resolver + xxx + } + `, + ExpectedResults: []testutil.TestResponse{ + {Errors: []string{"Cannot query field \"xxx\" on type \"Subscription\"."}}, + }, + }, { Name: "subscribe with resolver changes output", Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ @@ -90,7 +111,7 @@ func TestSchemaSubscribe(t *testing.T) { Fields: graphql.Fields{ "sub_with_resolver": &graphql.Field{ Type: graphql.String, - Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), + Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c", "d"}), Resolve: func(p graphql.ResolveParams) (interface{}, error) { return fmt.Sprintf("result=%v", p.Source), nil }, @@ -106,6 +127,7 @@ func TestSchemaSubscribe(t *testing.T) { {Data: `{ "sub_with_resolver": "result=a" }`}, {Data: `{ "sub_with_resolver": "result=b" }`}, {Data: `{ "sub_with_resolver": "result=c" }`}, + {Data: `{ "sub_with_resolver": "result=d" }`}, }, }, { diff --git a/testutil/subscription.go b/testutil/subscription.go index 2ac5faba..cef2725e 100644 --- a/testutil/subscription.go +++ b/testutil/subscription.go @@ -9,13 +9,12 @@ import ( "testing" "github.com/graphql-go/graphql" - "github.com/graphql-go/graphql/gqlerrors" ) // TestResponse models the expected response type TestResponse struct { Data string - Errors []gqlerrors.FormattedError + Errors []string } // TestSubscription is a GraphQL test case to be used with RunSubscribe. @@ -26,7 +25,6 @@ type TestSubscription struct { OperationName string Variables map[string]interface{} ExpectedResults []TestResponse - ExpectedErr error } // RunSubscribes runs the given GraphQL subscription test cases as subtests. @@ -75,15 +73,18 @@ func RunSubscribe(t *testing.T, test *TestSubscription) { } res := results[i] - checkErrorStrings(t, expected.Errors, res.Errors) - - resData, err := json.MarshalIndent(res.Data, "", " ") - if err != nil { - t.Fatal(err) + var errs []string + for _, err := range res.Errors { + errs = append(errs, err.Message) } + checkErrorStrings(t, expected.Errors, errs) + if expected.Data == "" { + continue + } + got, err := json.MarshalIndent(res.Data, "", " ") if err != nil { - t.Fatalf("got: invalid JSON: %s; raw: %s", err, resData) + t.Fatalf("got: invalid JSON: %s; raw: %s", err, got) } if err != nil { @@ -102,19 +103,19 @@ func RunSubscribe(t *testing.T, test *TestSubscription) { } } -func checkErrorStrings(t *testing.T, expected, actual []gqlerrors.FormattedError) { +func checkErrorStrings(t *testing.T, expected, actual []string) { expectedCount, actualCount := len(expected), len(actual) if expectedCount != actualCount { - t.Fatalf("unexpected number of errors: want %d, got %d", expectedCount, actualCount) + t.Fatalf("unexpected number of errors: want `%d`, got `%d`", expectedCount, actualCount) } if expectedCount > 0 { for i, want := range expected { got := actual[i] - if got.Error() != want.Error() { - t.Fatalf("unexpected error: got %+v, want %+v", got, want) + if got != want { + t.Fatalf("unexpected error: got `%+v`, want `%+v`", got, want) } } From 2b63a00f9dd3f5810c0e65bb207a064f6114eb12 Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 18:58:11 +0200 Subject: [PATCH 11/25] handle errors in ExecuteSubscribe --- subscription.go | 42 ++++++++++++++----------- subscription_test.go | 73 ++++++++++++++++---------------------------- 2 files changed, 50 insertions(+), 65 deletions(-) diff --git a/subscription.go b/subscription.go index cf2644ab..8cd51c2b 100644 --- a/subscription.go +++ b/subscription.go @@ -106,6 +106,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { } var resultChannel = make(chan *Result) go func() { + defer close(resultChannel) defer func() { if err := recover(); err != nil { e, ok := err.(error) @@ -113,11 +114,10 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { fmt.Println("strange program path") return } - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(e), - }) + } } - // close(resultChannel) return }() @@ -132,17 +132,19 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { }) if err != nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(err), - }) + } + return } operationType, err := getOperationRootType(p.Schema, exeContext.Operation) if err != nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(err), - }) + } + return } @@ -163,18 +165,20 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { fieldDef := getFieldDef(p.Schema, operationType, fieldName) if fieldDef == nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)), - }) + } + return } resolveFn := fieldDef.Subscribe if resolveFn == nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)), - }) + } + return } fieldPath := &ResponsePath{ @@ -202,16 +206,18 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { Context: p.Context, }) if err != nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(err), - }) + } + return } if fieldResult == nil { - sendOneResultAndClose(&Result{ + resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")), - }) + } + return } @@ -222,13 +228,13 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { select { case <-p.Context.Done(): println("context cancelled") - close(resultChannel) + // TODO send the context error to the resultchannel return case res, more := <-sub: if !more { - close(resultChannel) + return } resultChannel <- mapSourceToResponse(res) @@ -237,7 +243,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { default: fmt.Println(fieldResult) resultChannel <- mapSourceToResponse(fieldResult) - close(resultChannel) + return } }() diff --git a/subscription_test.go b/subscription_test.go index a4e2b99e..dc4b95f2 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -1,6 +1,7 @@ package graphql_test import ( + "errors" "fmt" "testing" @@ -84,7 +85,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }, { - Name: "receive parse error", + Name: "receive query validation error", Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ Name: "Subscription", Fields: graphql.Fields{ @@ -171,52 +172,30 @@ func TestSchemaSubscribe(t *testing.T) { }, }, - // { - // Name: "parse_errors", - // Schema: schema, - // Query: `invalid graphQL query`, - // ExpectedResults: []testutil.TestResponse{ - // { - // Errors: []gqlerrors.FormattedError{{Message: ""}}, - // }, - // }, - // }, - // { - // Name: "subscribe_to_query_succeeds", - // Schema: schema, - // Query: ` - // query Hello { - // hello - // } - // `, - // ExpectedResults: []testutil.TestResponse{ - // { - // Data: json.RawMessage(` - // { - // "hello": "Hello world!" - // } - // `), - // }, - // }, - // }, - // { - // Name: "subscription_resolver_can_error", - // Schema: schema, - // Query: ` - // subscription onHelloSaid { - // helloSaid { - // msg - // } - // } - // `, - // ExpectedResults: []testutil.TestResponse{ - // { - // Data: json.RawMessage(` - // null - // `), - // Errors: []gqlerrors.FormattedError{{Message: ""}}}, - // }, - // }, + { + Name: "subscription_resolver_can_error", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "should_error": &graphql.Field{ + Type: graphql.String, + Subscribe: func(p graphql.ResolveParams) (interface{}, error) { + return nil, errors.New("got a subscribe error") + }, + }, + }, + }), + Query: ` + subscription { + should_error + } + `, + ExpectedResults: []testutil.TestResponse{ + { + Errors: []string{"got a subscribe error"}, + }, + }, + }, // { // Name: "subscription_resolver_can_error_optional_msg", // Schema: schema, From 6baca7e7298592b377b7c8a782d72520662cb57a Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 19:03:05 +0200 Subject: [PATCH 12/25] subscription: more tests cases --- subscription.go | 4 +-- subscription_test.go | 80 ++++++++++++++------------------------------ 2 files changed, 27 insertions(+), 57 deletions(-) diff --git a/subscription.go b/subscription.go index 8cd51c2b..1ebe561d 100644 --- a/subscription.go +++ b/subscription.go @@ -228,8 +228,7 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { select { case <-p.Context.Done(): println("context cancelled") - - // TODO send the context error to the resultchannel + // TODO send the context error to the resultchannel? return case res, more := <-sub: @@ -243,7 +242,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { default: fmt.Println(fieldResult) resultChannel <- mapSourceToResponse(fieldResult) - return } }() diff --git a/subscription_test.go b/subscription_test.go index dc4b95f2..c194a01d 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -155,6 +155,9 @@ func TestSchemaSubscribe(t *testing.T) { { "field": "bye", }, + { + "field": nil, + }, }), }, }, @@ -169,6 +172,7 @@ func TestSchemaSubscribe(t *testing.T) { ExpectedResults: []testutil.TestResponse{ {Data: `{ "sub_with_object": { "field": "hello" } }`}, {Data: `{ "sub_with_object": { "field": "bye" } }`}, + {Data: `{ "sub_with_object": { "field": null } }`}, }, }, @@ -196,60 +200,28 @@ func TestSchemaSubscribe(t *testing.T) { }, }, }, - // { - // Name: "subscription_resolver_can_error_optional_msg", - // Schema: schema, - // Query: ` - // subscription onHelloSaid { - // helloSaidNullable { - // msg - // } - // } - // `, - // ExpectedResults: []testutil.TestResponse{ - // { - // Data: json.RawMessage(` - // { - // "helloSaidNullable": { - // "msg": null - // } - // } - // `), - // Errors: []gqlerrors.FormattedError{{Message: ""}}}, - // }, - // }, - // { - // Name: "subscription_resolver_can_error_optional_event", - // Schema: schema, - // Query: ` - // subscription onHelloSaid { - // helloSaidNullable { - // msg - // } - // } - // `, - // ExpectedResults: []testutil.TestResponse{ - // { - // Data: json.RawMessage(` - // { - // "helloSaidNullable": null - // } - // `), - // Errors: []gqlerrors.FormattedError{{Message: ""}}}, - // }, - // }, - // { - // Name: "schema_without_resolver_errors", - // Schema: schema, - // Query: ` - // subscription onHelloSaid { - // helloSaid { - // msg - // } - // } - // `, - // ExpectedErr: errors.New("schema created without resolver, can not subscribe"), - // }, + + { + Name: "schema_without_subscribe_errors", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "should_error": &graphql.Field{ + Type: graphql.String, + }, + }, + }), + Query: ` + subscription { + should_error + } + `, + ExpectedResults: []testutil.TestResponse{ + { + Errors: []string{"the subscription function \"should_error\" is not defined"}, + }, + }, + }, }) } From a673078f74f42d90cf7f68662a187b98c6d87f2e Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 19:04:30 +0200 Subject: [PATCH 13/25] subscription_test: refactored tests --- subscription_test.go | 140 ++++++++++++++----------------------------- 1 file changed, 45 insertions(+), 95 deletions(-) diff --git a/subscription_test.go b/subscription_test.go index c194a01d..4ebd0654 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -9,53 +9,6 @@ import ( "github.com/graphql-go/graphql/testutil" ) -func makeSubscribeToStringFunction(elements []string) func(p graphql.ResolveParams) (interface{}, error) { - return func(p graphql.ResolveParams) (interface{}, error) { - c := make(chan interface{}) - go func() { - for _, r := range elements { - select { - case <-p.Context.Done(): - close(c) - return - case c <- r: - } - } - close(c) - }() - return c, nil - } -} - -func makeSubscribeToMapFunction(elements []map[string]interface{}) func(p graphql.ResolveParams) (interface{}, error) { - return func(p graphql.ResolveParams) (interface{}, error) { - c := make(chan interface{}) - go func() { - for _, r := range elements { - select { - case <-p.Context.Done(): - close(c) - return - case c <- r: - } - } - close(c) - }() - return c, nil - } -} - -func makeSubscriptionSchema(t *testing.T, c graphql.ObjectConfig) graphql.Schema { - schema, err := graphql.NewSchema(graphql.SchemaConfig{ - Query: dummyQuery, - Subscription: graphql.NewObject(c), - }) - if err != nil { - t.Errorf("failed to create schema: %v", err) - } - return schema -} - func TestSchemaSubscribe(t *testing.T) { testutil.RunSubscribes(t, []*testutil.TestSubscription{ @@ -200,7 +153,6 @@ func TestSchemaSubscribe(t *testing.T) { }, }, }, - { Name: "schema_without_subscribe_errors", Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ @@ -225,54 +177,52 @@ func TestSchemaSubscribe(t *testing.T) { }) } -// func TestRootOperations_invalidSubscriptionSchema(t *testing.T) { -// type args struct { -// Schema string -// } -// type want struct { -// Error string -// } -// testTable := map[string]struct { -// Args args -// Want want -// }{ -// "Subscription as incorrect type": { -// Args: args{ -// Schema: ` -// schema { -// query: Query -// subscription: String -// } -// type Query { -// thing: String -// } -// `, -// }, -// Want: want{Error: `root operation "subscription" must be an OBJECT`}, -// }, -// "Subscription declared by schema, but type not present": { -// Args: args{ -// Schema: ` -// schema { -// query: Query -// subscription: Subscription -// } -// type Query { -// hello: String! -// } -// `, -// }, -// Want: want{Error: `graphql: type "Subscription" not found`}, -// }, -// } +func makeSubscribeToStringFunction(elements []string) func(p graphql.ResolveParams) (interface{}, error) { + return func(p graphql.ResolveParams) (interface{}, error) { + c := make(chan interface{}) + go func() { + for _, r := range elements { + select { + case <-p.Context.Done(): + close(c) + return + case c <- r: + } + } + close(c) + }() + return c, nil + } +} + +func makeSubscribeToMapFunction(elements []map[string]interface{}) func(p graphql.ResolveParams) (interface{}, error) { + return func(p graphql.ResolveParams) (interface{}, error) { + c := make(chan interface{}) + go func() { + for _, r := range elements { + select { + case <-p.Context.Done(): + close(c) + return + case c <- r: + } + } + close(c) + }() + return c, nil + } +} -// for name, tt := range testTable { -// tt := tt -// t.Run(name, func(t *testing.T) { -// t.Log(tt.Args.Schema) // TODO do something -// }) -// } -// } +func makeSubscriptionSchema(t *testing.T, c graphql.ObjectConfig) graphql.Schema { + schema, err := graphql.NewSchema(graphql.SchemaConfig{ + Query: dummyQuery, + Subscription: graphql.NewObject(c), + }) + if err != nil { + t.Errorf("failed to create schema: %v", err) + } + return schema +} var dummyQuery = graphql.NewObject(graphql.ObjectConfig{ Name: "Query", From d55e8cbb1b6913f80ae5538fd6b179bc21f0140d Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 19:05:54 +0200 Subject: [PATCH 14/25] subscription: removed some todos --- subscription.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/subscription.go b/subscription.go index 1ebe561d..5af16d96 100644 --- a/subscription.go +++ b/subscription.go @@ -92,8 +92,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { p.Context = context.Background() } - // TODO run executionDidStart functions from extensions - var mapSourceToResponse = func(payload interface{}) *Result { return Execute(ExecuteParams{ Schema: p.Schema, @@ -127,7 +125,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { AST: p.AST, OperationName: p.OperationName, Args: p.Args, - Result: &Result{}, // TODO what is this? Context: p.Context, }) From 4e255bf47a38c5963add5cb03f2efbb0c75944c8 Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 19:06:28 +0200 Subject: [PATCH 15/25] removed todos and prints --- subscription.go | 1 - testutil/subscription.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/subscription.go b/subscription.go index 5af16d96..e8cb8556 100644 --- a/subscription.go +++ b/subscription.go @@ -224,7 +224,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { for { select { case <-p.Context.Done(): - println("context cancelled") // TODO send the context error to the resultchannel? return diff --git a/testutil/subscription.go b/testutil/subscription.go index cef2725e..6dc92b5a 100644 --- a/testutil/subscription.go +++ b/testutil/subscription.go @@ -62,7 +62,7 @@ func RunSubscribe(t *testing.T, test *TestSubscription) { var results []*graphql.Result for res := range c { - println(pretty(res)) + t.Log(pretty(res)) results = append(results, res) } From 1fc61f7f418c0d13d464369a01fe78586fbcdaeb Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 19:16:12 +0200 Subject: [PATCH 16/25] subscription: added more comments --- subscription.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/subscription.go b/subscription.go index e8cb8556..24972284 100644 --- a/subscription.go +++ b/subscription.go @@ -22,11 +22,13 @@ type SubscribeParams struct { } // SubscriptableSchema implements `graphql-transport-ws` `GraphQLService` interface: https://github.com/graph-gophers/graphql-transport-ws/blob/40c0484322990a129cac2f2d2763c3315230280c/graphqlws/internal/connection/connection.go#L53 +// you can pass `SubscriptableSchema` to `graphql-transport-ws` `NewHandlerFunc` type SubscriptableSchema struct { Schema Schema RootObject map[string]interface{} } +// Subscribe method let you use SubscriptableSchema with graphql-transport-ws https://github.com/graph-gophers/graphql-transport-ws func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan *Result, error) { c := Subscribe(Params{ Schema: self.Schema, @@ -39,7 +41,8 @@ func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString stri return c, nil } -// Subscribe performs a subscribe operation +// Subscribe performs a subscribe operation on the given query and schema +// currently does not support extensions hooks func Subscribe(p Params) chan *Result { source := source.NewSource(&source.Source{ @@ -80,12 +83,14 @@ func Subscribe(p Params) chan *Result { } func sendOneResultAndClose(res *Result) chan *Result { - resultChannel := make(chan *Result, 1) // TODO unbuffered channel does not pass errors, why? + resultChannel := make(chan *Result, 1) resultChannel <- res close(resultChannel) return resultChannel } +// ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result +// currently does not support extensions func ExecuteSubscription(p ExecuteParams) chan *Result { if p.Context == nil { @@ -175,7 +180,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { resultChannel <- &Result{ Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)), } - return } fieldPath := &ResponsePath{ @@ -229,7 +233,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { case res, more := <-sub: if !more { - return } resultChannel <- mapSourceToResponse(res) From 183515e8badf6f0b09d00c7a26f0b0ab9d3bc489 Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 20:05:21 +0200 Subject: [PATCH 17/25] subscription: SubscriptableSchema conforms to ws interface --- subscription.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/subscription.go b/subscription.go index 24972284..da14f8a4 100644 --- a/subscription.go +++ b/subscription.go @@ -29,7 +29,7 @@ type SubscriptableSchema struct { } // Subscribe method let you use SubscriptableSchema with graphql-transport-ws https://github.com/graph-gophers/graphql-transport-ws -func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan *Result, error) { +func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) { c := Subscribe(Params{ Schema: self.Schema, Context: ctx, @@ -38,7 +38,20 @@ func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString stri RootObject: self.RootObject, VariableValues: variables, }) - return c, nil + to := make(chan interface{}) + go func() { + defer close(to) + select { + case <-ctx.Done(): + return + case res, more := <-c: + if !more { + return + } + to <- res + } + }() + return to, nil } // Subscribe performs a subscribe operation on the given query and schema From 77ab10baad00ea3e9d59466eeefd86f60739c13b Mon Sep 17 00:00:00 2001 From: remorses Date: Mon, 18 May 2020 20:50:21 +0200 Subject: [PATCH 18/25] subscription: added loop in subscriptabel schema --- subscription.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/subscription.go b/subscription.go index da14f8a4..782b0431 100644 --- a/subscription.go +++ b/subscription.go @@ -41,14 +41,16 @@ func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString stri to := make(chan interface{}) go func() { defer close(to) - select { - case <-ctx.Done(): - return - case res, more := <-c: - if !more { + for { + select { + case <-ctx.Done(): return + case res, more := <-c: + if !more { + return + } + to <- res } - to <- res } }() return to, nil From ab87417e21be98fa2864c372ee5244dd718fd4ca Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 11:53:31 +0200 Subject: [PATCH 19/25] subscription: removed a print --- subscription.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/subscription.go b/subscription.go index 782b0431..fa4b47bd 100644 --- a/subscription.go +++ b/subscription.go @@ -243,7 +243,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { for { select { case <-p.Context.Done(): - // TODO send the context error to the resultchannel? return case res, more := <-sub: @@ -254,7 +253,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { } } default: - fmt.Println(fieldResult) resultChannel <- mapSourceToResponse(fieldResult) return } From b63505c6a1e001b9d77eccd0c2e19a2c1047a50b Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 12:02:12 +0200 Subject: [PATCH 20/25] subscription_test: added more tests for resolver behaviour --- subscription_test.go | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/subscription_test.go b/subscription_test.go index 4ebd0654..98aae98c 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -18,11 +18,18 @@ func TestSchemaSubscribe(t *testing.T) { Name: "Subscription", Fields: graphql.Fields{ "sub_without_resolver": &graphql.Field{ - Type: graphql.String, - Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), - Resolve: func(p graphql.ResolveParams) (interface{}, error) { // TODO remove dummy resolver - return p.Source, nil - }, + Type: graphql.String, + Subscribe: makeSubscribeToMapFunction([]map[string]interface{}{ + { + "sub_without_resolver": "a", + }, + { + "sub_without_resolver": "b", + }, + { + "sub_without_resolver": "c", + }, + }), }, }, }), @@ -37,6 +44,31 @@ func TestSchemaSubscribe(t *testing.T) { {Data: `{ "sub_without_resolver": "c" }`}, }, }, + { + Name: "subscribe with resolver", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "sub_with_resolver": &graphql.Field{ + Type: graphql.String, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source, nil + }, + Subscribe: makeSubscribeToStringFunction([]string{"a", "b", "c"}), + }, + }, + }), + Query: ` + subscription onHelloSaid { + sub_with_resolver + } + `, + ExpectedResults: []testutil.TestResponse{ + {Data: `{ "sub_with_resolver": "a" }`}, + {Data: `{ "sub_with_resolver": "b" }`}, + {Data: `{ "sub_with_resolver": "c" }`}, + }, + }, { Name: "receive query validation error", Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ @@ -98,7 +130,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }, }), - Resolve: func(p graphql.ResolveParams) (interface{}, error) { // TODO remove dummy resolver + Resolve: func(p graphql.ResolveParams) (interface{}, error) { return p.Source, nil }, Subscribe: makeSubscribeToMapFunction([]map[string]interface{}{ From 3b135768a63cb3a8772e80d767075739c8208175 Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 12:06:04 +0200 Subject: [PATCH 21/25] subscription_test: test for `panic inside subscribe is recovered` --- subscription_test.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/subscription_test.go b/subscription_test.go index 98aae98c..0a4bebee 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -34,7 +34,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }), Query: ` - subscription onHelloSaid { + subscription { sub_without_resolver } `, @@ -59,7 +59,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }), Query: ` - subscription onHelloSaid { + subscription { sub_with_resolver } `, @@ -81,7 +81,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }), Query: ` - subscription onHelloSaid { + subscription { sub_without_resolver xxx } @@ -90,6 +90,28 @@ func TestSchemaSubscribe(t *testing.T) { {Errors: []string{"Cannot query field \"xxx\" on type \"Subscription\"."}}, }, }, + { + Name: "panic inside subscribe is recovered", + Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ + Name: "Subscription", + Fields: graphql.Fields{ + "should_error": &graphql.Field{ + Type: graphql.String, + Subscribe: func(p graphql.ResolveParams) (interface{}, error) { + panic(errors.New("got a panic error")) + }, + }, + }, + }), + Query: ` + subscription { + should_error + } + `, + ExpectedResults: []testutil.TestResponse{ + {Errors: []string{"got a panic error"}}, + }, + }, { Name: "subscribe with resolver changes output", Schema: makeSubscriptionSchema(t, graphql.ObjectConfig{ @@ -105,7 +127,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }), Query: ` - subscription onHelloSaid { + subscription { sub_with_resolver } `, @@ -148,7 +170,7 @@ func TestSchemaSubscribe(t *testing.T) { }, }), Query: ` - subscription onHelloSaid { + subscription { sub_with_object { field } From 44a282841b7f0e4f66299649086b5aaf7912dd35 Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 12:09:12 +0200 Subject: [PATCH 22/25] subscription: added a comment --- subscription.go | 1 + 1 file changed, 1 insertion(+) diff --git a/subscription.go b/subscription.go index fa4b47bd..62d28170 100644 --- a/subscription.go +++ b/subscription.go @@ -57,6 +57,7 @@ func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString stri } // Subscribe performs a subscribe operation on the given query and schema +// To finish a subscription you can simply close the channel from inside the `Subscribe` function // currently does not support extensions hooks func Subscribe(p Params) chan *Result { From eb0fdaee105853bfc77b438dc21b8a5ecbf68ef7 Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 12:11:33 +0200 Subject: [PATCH 23/25] subscription: removed SubscriptableSchema --- subscription.go | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/subscription.go b/subscription.go index 62d28170..98d503b3 100644 --- a/subscription.go +++ b/subscription.go @@ -21,41 +21,6 @@ type SubscribeParams struct { FieldSubscriber FieldResolveFn } -// SubscriptableSchema implements `graphql-transport-ws` `GraphQLService` interface: https://github.com/graph-gophers/graphql-transport-ws/blob/40c0484322990a129cac2f2d2763c3315230280c/graphqlws/internal/connection/connection.go#L53 -// you can pass `SubscriptableSchema` to `graphql-transport-ws` `NewHandlerFunc` -type SubscriptableSchema struct { - Schema Schema - RootObject map[string]interface{} -} - -// Subscribe method let you use SubscriptableSchema with graphql-transport-ws https://github.com/graph-gophers/graphql-transport-ws -func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) { - c := Subscribe(Params{ - Schema: self.Schema, - Context: ctx, - OperationName: operationName, - RequestString: queryString, - RootObject: self.RootObject, - VariableValues: variables, - }) - to := make(chan interface{}) - go func() { - defer close(to) - for { - select { - case <-ctx.Done(): - return - case res, more := <-c: - if !more { - return - } - to <- res - } - } - }() - return to, nil -} - // Subscribe performs a subscribe operation on the given query and schema // To finish a subscription you can simply close the channel from inside the `Subscribe` function // currently does not support extensions hooks From d0a91ba24a5e8e12b8318c788ef876d7d5d26fbf Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 19 May 2020 12:16:20 +0200 Subject: [PATCH 24/25] subscription: removed a print --- subscription.go | 1 - 1 file changed, 1 deletion(-) diff --git a/subscription.go b/subscription.go index 98d503b3..ef5d73ef 100644 --- a/subscription.go +++ b/subscription.go @@ -95,7 +95,6 @@ func ExecuteSubscription(p ExecuteParams) chan *Result { if err := recover(); err != nil { e, ok := err.(error) if !ok { - fmt.Println("strange program path") return } resultChannel <- &Result{ From d54fb02f70601cc149c9cb057555a6564c4fad64 Mon Sep 17 00:00:00 2001 From: Branden Horiuchi Date: Tue, 19 May 2020 06:46:59 -0700 Subject: [PATCH 25/25] adding context cancel back to pass CI tests --- testutil/subscription.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testutil/subscription.go b/testutil/subscription.go index 6dc92b5a..b17c4b65 100644 --- a/testutil/subscription.go +++ b/testutil/subscription.go @@ -42,8 +42,8 @@ func RunSubscribes(t *testing.T, tests []*TestSubscription) { // RunSubscribe runs a single GraphQL subscription test case. func RunSubscribe(t *testing.T, test *TestSubscription) { - ctx, _ := context.WithCancel(context.Background()) - // defer cancel() // TODO add defer cancel + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() c := graphql.Subscribe(graphql.Params{ Context: ctx,