diff --git a/.build-tools/go.mod b/.build-tools/go.mod index 7b5e7b9b55..787fd605c6 100644 --- a/.build-tools/go.mod +++ b/.build-tools/go.mod @@ -1,6 +1,8 @@ module github.com/dapr/components-contrib/build-tools -go 1.24.4 +go 1.24.6 + +toolchain go1.24.10 require ( github.com/dapr/components-contrib v0.0.0 diff --git a/bindings/sftp/client.go b/bindings/sftp/client.go new file mode 100644 index 0000000000..8e9b672ad0 --- /dev/null +++ b/bindings/sftp/client.go @@ -0,0 +1,245 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sftp + +import ( + "errors" + "fmt" + "os" + "sync" + "sync/atomic" + + sftpClient "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" +) + +type Client struct { + sshClient *ssh.Client + sftpClient *sftpClient.Client + address string + config *ssh.ClientConfig + lock sync.RWMutex + needsReconnect atomic.Bool +} + +func newClient(address string, config *ssh.ClientConfig) (*Client, error) { + if address == "" || config == nil { + return nil, errors.New("sftp binding error: client not initialized") + } + + sshClient, err := newSSHClient(address, config) + if err != nil { + return nil, err + } + + newSftpClient, err := sftpClient.NewClient(sshClient) + if err != nil { + _ = sshClient.Close() + return nil, fmt.Errorf("sftp binding error: error create sftp client: %w", err) + } + + return &Client{ + sshClient: sshClient, + sftpClient: newSftpClient, + address: address, + config: config, + }, nil +} + +func (c *Client) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + // Close SFTP first, then SSH + var sftpErr, sshErr error + if c.sftpClient != nil { + sftpErr = c.sftpClient.Close() + } + if c.sshClient != nil { + sshErr = c.sshClient.Close() + } + + // Return the first error encountered + if sftpErr != nil { + return sftpErr + } + return sshErr +} + +func (c *Client) list(path string) ([]os.FileInfo, error) { + var fi []os.FileInfo + + fn := func() error { + var err error + fi, err = c.sftpClient.ReadDir(path) + return err + } + + err := withReconnection(c, fn) + if err != nil { + return nil, err + } + + return fi, nil +} + +func (c *Client) create(path string) (*sftpClient.File, string, error) { + dir, fileName := sftpClient.Split(path) + + var file *sftpClient.File + + createFn := func() error { + cErr := c.sftpClient.MkdirAll(dir) + if cErr != nil { + return cErr + } + + file, cErr = c.sftpClient.Create(path) + if cErr != nil { + return cErr + } + + return nil + } + + rErr := withReconnection(c, createFn) + if rErr != nil { + return nil, "", rErr + } + + return file, fileName, nil +} + +func (c *Client) get(path string) (*sftpClient.File, error) { + var f *sftpClient.File + + fn := func() error { + var err error + f, err = c.sftpClient.Open(path) + return err + } + + err := withReconnection(c, fn) + if err != nil { + return nil, err + } + + return f, nil +} + +func (c *Client) delete(path string) error { + fn := func() error { + return c.sftpClient.Remove(path) + } + + err := withReconnection(c, fn) + if err != nil { + return err + } + + return nil +} + +func (c *Client) ping() error { + _, err := c.sftpClient.Getwd() + if err != nil { + return err + } + return nil +} + +func withReconnection(c *Client, fn func() error) error { + c.lock.RLock() + err := fn() + if c.shouldReconnect(err) { + c.needsReconnect.Store(true) + } + c.lock.RUnlock() + + if c.shouldReconnect(err) { + rErr := doReconnect(c) + if rErr != nil { + return errors.Join(err, rErr) + } + } + + c.lock.RLock() + defer c.lock.RUnlock() + err = fn() + if err != nil { + return err + } + + return nil +} + +func doReconnect(c *Client) error { + if !c.needsReconnect.Load() { + return nil + } + c.lock.Lock() + defer c.lock.Unlock() + + pErr := c.ping() + if pErr != nil { + sshClient, err := newSSHClient(c.address, c.config) + if err != nil { + return err + } + + newSftpClient, err := sftpClient.NewClient(sshClient) + if err != nil { + _ = sshClient.Close() + return fmt.Errorf("sftp binding error: error create sftp client: %w", err) + } + + oldSftp := c.sftpClient + oldSSH := c.sshClient + c.sftpClient = newSftpClient + c.sshClient = sshClient + + if oldSftp != nil { + _ = oldSftp.Close() + } + if oldSSH != nil { + _ = oldSSH.Close() + } + } + c.needsReconnect.Store(false) + return nil +} + +func newSSHClient(address string, config *ssh.ClientConfig) (*ssh.Client, error) { + sshClient, err := ssh.Dial("tcp", address, config) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error dialing ssh server: %w", err) + } + return sshClient, nil +} + +// shouldReconnect returns true if the error looks like a transport-level failure +func (c *Client) shouldReconnect(err error) bool { + if err == nil { + return false + } + + // SFTP status errors that are logical, not connectivity (avoid reconnect) + if errors.Is(err, sftpClient.ErrSSHFxPermissionDenied) || + errors.Is(err, sftpClient.ErrSSHFxNoSuchFile) || + errors.Is(err, sftpClient.ErrSSHFxOpUnsupported) { + return false + } + + return true +} diff --git a/bindings/sftp/docker-compose.yaml b/bindings/sftp/docker-compose.yaml new file mode 100644 index 0000000000..188dc6c952 --- /dev/null +++ b/bindings/sftp/docker-compose.yaml @@ -0,0 +1,11 @@ +services: + sftp: + image: + atmoz/sftp + environment: + - SFTP_USERS=foo:pass:1001:1001:upload + volumes: + - ./upload:/home/foo/upload + ports: + - "2222:22" + diff --git a/bindings/sftp/sftp.go b/bindings/sftp/sftp.go index 211cce85d3..536a224afb 100644 --- a/bindings/sftp/sftp.go +++ b/bindings/sftp/sftp.go @@ -1,3 +1,16 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sftp import ( @@ -25,9 +38,9 @@ const ( // Sftp is a binding for file operations on sftp server. type Sftp struct { - metadata *sftpMetadata - logger logger.Logger - sftpClient *sftpClient.Client + metadata *sftpMetadata + logger logger.Logger + c *Client } // sftpMetadata defines the sftp metadata. @@ -115,19 +128,12 @@ func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error { HostKeyCallback: hostKeyCallback, } - sshClient, err := ssh.Dial("tcp", m.Address, config) - if err != nil { - return fmt.Errorf("sftp binding error: error create ssh client: %w", err) - } - - newSftpClient, err := sftpClient.NewClient(sshClient) + sftp.metadata = m + sftp.c, err = newClient(m.Address, config) if err != nil { - return fmt.Errorf("sftp binding error: error create sftp client: %w", err) + return fmt.Errorf("sftp binding error: create sftp client error: %w", err) } - sftp.metadata = m - sftp.sftpClient = newSftpClient - return nil } @@ -161,14 +167,9 @@ func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindi return nil, fmt.Errorf("sftp binding error: %w", err) } - dir, fileName := sftpClient.Split(path) + c := sftp.c - err = sftp.sftpClient.MkdirAll(dir) - if err != nil { - return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err) - } - - file, err := sftp.sftpClient.Create(path) + file, fileName, err := c.create(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err) } @@ -211,7 +212,9 @@ func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*binding return nil, fmt.Errorf("sftp binding error: %w", err) } - files, err := sftp.sftpClient.ReadDir(path) + c := sftp.c + + files, err := c.list(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err) } @@ -246,7 +249,9 @@ func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings return nil, fmt.Errorf("sftp binding error: %w", err) } - file, err := sftp.sftpClient.Open(path) + c := sftp.c + + file, err := c.get(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err) } @@ -272,7 +277,9 @@ func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindi return nil, fmt.Errorf("sftp binding error: %w", err) } - err = sftp.sftpClient.Remove(path) + c := sftp.c + + err = c.delete(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err) } @@ -296,7 +303,7 @@ func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bin } func (sftp *Sftp) Close() error { - return sftp.sftpClient.Close() + return sftp.c.Close() } func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) { diff --git a/bindings/sftp/sftp_integration_test.go b/bindings/sftp/sftp_integration_test.go index 178c7535db..0af41dcea0 100644 --- a/bindings/sftp/sftp_integration_test.go +++ b/bindings/sftp/sftp_integration_test.go @@ -1,40 +1,89 @@ +//go:build integration_test + +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sftp import ( + "context" "encoding/json" + "math/rand" "os" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/tests/certification/flow" + "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" + sftp "github.com/dapr/components-contrib/tests/utils/sftpproxy" ) -var connectionStringEnvKey = "DAPR_TEST_SFTP_CONNSTRING" +const ( + ProxySftp = "0.0.0.0:2223" + ConnectionString = "0.0.0.0:2222" +) -// Run docker from the file location as the upload folder is relative to the test -// docker run -v ./upload:/home/foo/upload -p 2222:22 -d atmoz/sftp foo:pass:1001 func TestIntegrationCases(t *testing.T) { - connectionString := os.Getenv(connectionStringEnvKey) - if connectionString == "" { - t.Skipf(`sftp binding integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="localhost:2222")'`, connectionStringEnvKey) - } - + cleanUp := setupSftp(t) + defer cleanUp() + time.Sleep(1 * time.Second) t.Run("List operation", testListOperation) t.Run("Create operation", testCreateOperation) + t.Run("Reconnections", testReconnect) +} + +func setupSftp(t *testing.T) func() { + dc := dockercompose.New("sftp", "docker-compose.yaml") + ctx := flow.Context{ + T: t, + Context: t.Context(), + Flow: nil, + } + err := dc.Up(ctx) + + if err != nil { + t.Fatal(err) + } + + return func() { dc.Down(ctx) } } func testListOperation(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: ConnectionString, + } + + defer proxy.Close() + go proxy.ListenAndServe() + c := Sftp{} + m := bindings.Metadata{} + m.Properties = map[string]string{ "rootPath": "/upload", - "address": os.Getenv(connectionStringEnvKey), + "address": ProxySftp, "username": "foo", "password": "pass", "insecureIgnoreHostKey": "true", } + err := c.Init(t.Context(), m) require.NoError(t, err) @@ -45,14 +94,22 @@ func testListOperation(t *testing.T) { var d []listResponse err = json.Unmarshal(r.Data, &d) require.NoError(t, err) + + assert.EqualValues(t, 1, proxy.ReconnectionCount.Load()) } func testCreateOperation(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: ConnectionString, + } + defer proxy.Close() + go proxy.ListenAndServe() c := Sftp{} m := bindings.Metadata{} m.Properties = map[string]string{ "rootPath": "/upload", - "address": os.Getenv(connectionStringEnvKey), + "address": ProxySftp, "username": "foo", "password": "pass", "insecureIgnoreHostKey": "true", @@ -79,4 +136,161 @@ func testCreateOperation(t *testing.T) { file, err := os.Stat("./upload/test.txt") require.NoError(t, err) assert.Equal(t, "test.txt", file.Name()) + assert.EqualValues(t, 1, proxy.ReconnectionCount.Load()) +} + +func testReconnect(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: ConnectionString, + } + defer proxy.Close() + go proxy.ListenAndServe() + + c := Sftp{} + + m := bindings.Metadata{} + + m.Properties = map[string]string{ + "rootPath": "/upload", + "address": ProxySftp, + "username": "foo", + "password": "pass", + "insecureIgnoreHostKey": "true", + } + + err := c.Init(t.Context(), m) + require.NoError(t, err) + + t.Run("List operation", func(t *testing.T) { + r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation}) + require.NoError(t, err) + assert.NotNil(t, r.Data) + + _ = proxy.KillServerConn() + + r, err = c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation}) + require.NoError(t, err) + assert.NotNil(t, r.Data) + + var d []listResponse + err = json.Unmarshal(r.Data, &d) + require.NoError(t, err) + + assert.EqualValues(t, 2, proxy.ReconnectionCount.Load()) + }) + + t.Run("List delete - no reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.DeleteOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load()) + }) + + t.Run("List delete - reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _ = proxy.KillServerConn() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.DeleteOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load()) + }) + + t.Run("List get - no reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.GetOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load()) + }) + + t.Run("List get - reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _ = proxy.KillServerConn() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.GetOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load()) + }) + + t.Run("Parallel ops - reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + ctx, cancelFn := context.WithCancel(t.Context()) + opCount := atomic.Int32{} + opFailed := atomic.Int32{} + for range 10 { + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(100*rand.Float32()) * time.Millisecond): + opCount.Add(1) + r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation}) + if err != nil { + opFailed.Add(1) + break + } + + assert.NotNil(t, r.Data) + } + } + }(ctx) + } + + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + _ = proxy.KillServerConn() + } + } + + }(ctx) + + time.Sleep(time.Second * 5) + cancelFn() + + totalOps := opCount.Load() + failedOps := opFailed.Load() + + // Calculate 5% tolerance + tolerance := float64(totalOps) * 0.05 + + // Assert that failed operations are within 1% of total operations + assert.InDelta(t, 0, failedOps, tolerance, + "Expected less than 1%% of operations to fail. Total: %d, Failed: %d (%.2f%%)", + totalOps, failedOps, (float64(failedOps)/float64(totalOps))*100) + + expectedReconnects := numReconnects + 5 + currentReconnects := proxy.ReconnectionCount.Load() + assert.InDelta(t, expectedReconnects, currentReconnects, 2.0, "Expected %d reconnections, got %d", expectedReconnects, currentReconnects) + }) } diff --git a/go.mod b/go.mod index 0d24d1cb13..0aa42a2d67 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/dapr/components-contrib -go 1.24.4 +go 1.24.6 + +toolchain go1.24.10 require ( cloud.google.com/go/datastore v1.20.0 @@ -58,12 +60,13 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459 github.com/cinience/go_rocketmq v0.0.2 - github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/cloudwego/kitex v0.5.0 github.com/cloudwego/kitex-examples v0.1.1 github.com/cyphar/filepath-securejoin v0.2.4 github.com/dancannon/gorethink v4.0.0+incompatible + github.com/dapr/components-contrib/tests/certification v0.0.0-20251104160704-920ad8a7b958 github.com/dapr/kit v0.16.1 github.com/didip/tollbooth/v7 v7.0.1 github.com/eclipse/paho.mqtt.golang v1.4.3 @@ -418,7 +421,6 @@ require ( go.opentelemetry.io/otel/sdk v1.35.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect go.opentelemetry.io/otel/trace v1.35.0 // indirect - go.opentelemetry.io/proto/otlp v1.6.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.10.0 // indirect diff --git a/go.sum b/go.sum index 5a8bc06ef4..0725b38e4b 100644 --- a/go.sum +++ b/go.sum @@ -421,8 +421,8 @@ github.com/clbanning/mxj/v2 v2.5.6 h1:Jm4VaCI/+Ug5Q57IzEoZbwx4iQFA6wkXv72juUSeK+ github.com/clbanning/mxj/v2 v2.5.6/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5fM9r76ufM44AVj9Dnz2IOM0Xs6FVxZRM= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 h1:FIvfKlS2mcuP0qYY6yzdIU9xdrRd/YMP0bNwFjXd0u8= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2/go.mod h1:POsdVp/08Mki0WD9QvvgRRpg9CQ6zhjfRrBoEY8JFS8= github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= @@ -516,6 +516,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= +github.com/dapr/components-contrib/tests/certification v0.0.0-20251104160704-920ad8a7b958 h1:DSZgzdXlbF75fwvEkMQpPqn1jjxmWVoBNmI4Bc4dS40= +github.com/dapr/components-contrib/tests/certification v0.0.0-20251104160704-920ad8a7b958/go.mod h1:IUB5RJv0Gj5qxsHjjhvEBIlxPka7cD7KAn/Coa2y27M= github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= diff --git a/tests/e2e/pubsub/jetstream/go.mod b/tests/e2e/pubsub/jetstream/go.mod index 78ccb74cb4..9d386ccaa0 100644 --- a/tests/e2e/pubsub/jetstream/go.mod +++ b/tests/e2e/pubsub/jetstream/go.mod @@ -1,6 +1,8 @@ module github.com/dapr/components-contrib/tests/e2e/pubsub/jetstream -go 1.24.4 +go 1.24.6 + +toolchain go1.24.10 require ( github.com/dapr/components-contrib v1.10.6-0.20230403162214-9ee9d56cb7ea @@ -9,11 +11,10 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 // indirect + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 // indirect github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/tests/e2e/pubsub/jetstream/go.sum b/tests/e2e/pubsub/jetstream/go.sum index d5752cdb42..f859a00120 100644 --- a/tests/e2e/pubsub/jetstream/go.sum +++ b/tests/e2e/pubsub/jetstream/go.sum @@ -1,7 +1,7 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5fM9r76ufM44AVj9Dnz2IOM0Xs6FVxZRM= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 h1:FIvfKlS2mcuP0qYY6yzdIU9xdrRd/YMP0bNwFjXd0u8= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2/go.mod h1:POsdVp/08Mki0WD9QvvgRRpg9CQ6zhjfRrBoEY8JFS8= github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= @@ -16,8 +16,6 @@ github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/tests/utils/sftpproxy/proxy.go b/tests/utils/sftpproxy/proxy.go new file mode 100644 index 0000000000..8be38d38d6 --- /dev/null +++ b/tests/utils/sftpproxy/proxy.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sftpproxy + +import ( + "errors" + "io" + "log" + "net" + "sync/atomic" + "time" +) + +type Proxy struct { + ListenAddr string + UpstreamAddr string + Client net.Conn + Server net.Conn + ReconnectionCount atomic.Int32 + Listener net.Listener +} + +func (p *Proxy) ListenAndServe() error { + ln, err := net.Listen("tcp", p.ListenAddr) + if err != nil { + log.Fatalf("listen: %v", err) + } + log.Printf("Proxy listening on %s -> %s", p.ListenAddr, p.UpstreamAddr) + p.Listener = ln + + for { + client, err := ln.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + log.Printf("accept error: %v", err) + continue + } + go p.handle(client) + } +} + +func (p *Proxy) handle(client net.Conn) { + defer client.Close() + + // Connect to upstream SFTP server + var server net.Conn + var err error + for i := 0; i < 10 && server == nil; i++ { + server, err = net.Dial("tcp", p.UpstreamAddr) + if err != nil { + log.Printf("dial upstream: %v", err) + time.Sleep(200 * time.Millisecond) + } + } + + if server == nil { + log.Printf("failed to connect to upstream after 5 attempts") + return + } + defer server.Close() + + p.Client = client + p.Server = server + p.ReconnectionCount.Add(1) + errCh := make(chan error, 2) + + // client -> server + go func() { + _, cErr := io.Copy(server, client) + errCh <- cErr + }() + + // server -> client + go func() { + _, cErr := io.Copy(client, server) + errCh <- cErr + }() + + // When either direction ends, close both ends + if err := <-errCh; err != nil && !isUsefullyClosed(err) { + log.Printf("proxy stream ended with error: %v", err) + } +} + +func (p *Proxy) KillServerConn() error { + return p.Server.Close() +} + +func (p *Proxy) Close() { + if p.Client != nil { + _ = p.Client.Close() + } + + if p.Server != nil { + _ = p.Server.Close() + } + + if p.Listener != nil { + _ = p.Listener.Close() + } + + p.ReconnectionCount.Store(0) +} + +// isUsefullyClosed filters common close conditions from logging noise +func isUsefullyClosed(err error) bool { + return err == io.EOF || errors.Is(err, net.ErrClosed) +}