@@ -8,8 +8,10 @@ import (
8
8
"os"
9
9
"strconv"
10
10
"strings"
11
+ "time"
11
12
12
13
"github.com/Shopify/sarama"
14
+ "gopkg.in/retry.v1"
13
15
)
14
16
15
17
var ErrDisabled = fmt .Errorf ("kafka tests are disabled" )
@@ -35,6 +37,10 @@ var ErrDisabled = fmt.Errorf("kafka tests are disabled")
35
37
// A boolean as parsed by strconv.ParseBool. If this
36
38
// is true, a secure TLS connection will be used.
37
39
//
40
+ // - $KAFKA_TIMEOUT
41
+ // The maximum duration to wait when trying to connect
42
+ // to Kakfa. Defaults to "30s".
43
+ //
38
44
// The returned Kafka instance must be closed after use.
39
45
func New () (* Kafka , error ) {
40
46
disabled , err := boolVar ("KAFKA_DISABLE" )
@@ -59,11 +65,29 @@ func New() (*Kafka, error) {
59
65
saslUser : os .Getenv ("KAFKA_USERNAME" ),
60
66
saslPassword : os .Getenv ("KAFKA_PASSWORD" ),
61
67
}
62
- admin , err := sarama .NewClusterAdmin (addrs , client .Config ())
63
- if err != nil {
64
- return nil , fmt .Errorf ("cannot connect to Kafka cluster at %q: %v" , addrs , err )
68
+ // The cluster might not be available immediately, so try
69
+ // for a while before giving up.
70
+ retryLimit := 30 * time .Second
71
+ if limit := os .Getenv ("KAFKA_TIMEOUT" ); limit != "" {
72
+ retryLimit , err = time .ParseDuration (limit )
73
+ if err != nil {
74
+ return nil , fmt .Errorf ("bad value for KAFKA_TIMEOUT: %v" , err )
75
+ }
76
+ }
77
+ retryStrategy := retry .LimitTime (retryLimit , retry.Exponential {
78
+ Initial : time .Millisecond ,
79
+ MaxDelay : time .Second ,
80
+ })
81
+ for a := retry .Start (retryStrategy , nil ); a .Next (); {
82
+ admin , err := sarama .NewClusterAdmin (addrs , client .Config ())
83
+ if err == nil {
84
+ client .admin = admin
85
+ break
86
+ }
87
+ if ! a .Next () {
88
+ return nil , fmt .Errorf ("cannot connect to Kafka cluster at %q after %v: %v" , addrs , retryLimit , err )
89
+ }
65
90
}
66
- client .admin = admin
67
91
return client , nil
68
92
}
69
93
0 commit comments