Skip to content

Commit dfebdb1

Browse files
move upload strategy to settings
1 parent 5161285 commit dfebdb1

File tree

11 files changed

+52
-33
lines changed

11 files changed

+52
-33
lines changed

packages/destination-actions/src/destinations/sftp/__tests__/client.test.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DEFAULT_REQUEST_TIMEOUT } from '@segment/actions-core'
22
import { normalizeSSHKey, testSFTPConnection, uploadSFTP } from '../client'
3-
import { SFTP_DEFAULT_PORT } from '../constants'
3+
import { SFTP_DEFAULT_PORT, UploadStrategy } from '../constants'
44
import { Settings } from '../generated-types'
55
import { SFTPWrapper } from '../sftp-wrapper'
66

@@ -77,8 +77,13 @@ describe('SFTP Client', () => {
7777
expect(mockSftpInstance.end).toHaveBeenCalled()
7878
})
7979

80-
it('uploads using fastPutFromBuffer when useConcurrency is true', async () => {
81-
await uploadSFTP(sshKeySettings, 'sftp_folder_path', 'filename', Buffer.from('test content'), true)
80+
it('uploads using fastPutFromBuffer when upload strategy is concurrent', async () => {
81+
await uploadSFTP(
82+
{ ...sshKeySettings, uploadStrategy: UploadStrategy.CONCURRENT },
83+
'sftp_folder_path',
84+
'filename',
85+
Buffer.from('test content')
86+
)
8287

8388
expect(SFTPWrapper).toHaveBeenCalled()
8489
expect(mockSftpInstance.connect).toHaveBeenCalledWith({

packages/destination-actions/src/destinations/sftp/__tests__/functions.test.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { MultiStatusResponse } from '@segment/actions-core'
22
import { uploadSFTP } from '../client'
3-
import { SFTP_DEFAULT_PORT } from '../constants'
3+
import { SFTP_DEFAULT_PORT, UploadStrategy } from '../constants'
44
import {
55
clean,
66
createFilename,
@@ -416,7 +416,6 @@ describe('send', () => {
416416
enable_batching: true,
417417
file_extension: 'csv',
418418
batch_size: 100000,
419-
useConcurrentWrites: true,
420419
columns: {
421420
422421
name: 'John Doe'
@@ -425,15 +424,15 @@ describe('send', () => {
425424
]
426425

427426
const signal = AbortSignal.timeout(0)
427+
const settings = { ...mockSettings, uploadStrategy: UploadStrategy.CONCURRENT }
428428

429-
await send(payloads, mockSettings, mockRawMapping, undefined, signal)
429+
await send(payloads, settings, mockRawMapping, undefined, signal)
430430

431431
expect(mockUploadSFTP).toHaveBeenCalledWith(
432-
mockSettings,
432+
settings,
433433
'/uploads',
434434
expect.any(String), // filename
435435
expect.any(Buffer), // file content,
436-
true, // useConcurrentWrites
437436
undefined, // logger
438437
signal
439438
)

packages/destination-actions/src/destinations/sftp/client.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
} from '@segment/actions-core'
88
import path from 'path'
99
import Client from 'ssh2-sftp-client'
10-
import { SFTP_DEFAULT_PORT } from './constants'
10+
import { SFTP_DEFAULT_PORT, UploadStrategy } from './constants'
1111
import { Settings } from './generated-types'
1212
import { sftpConnectionConfig } from './types'
1313
import { SFTPWrapper } from './sftp-wrapper'
@@ -27,18 +27,21 @@ interface SFTPError extends Error {
2727
* @param sftpFolderPath - The target folder path on the SFTP server.
2828
* @param filename - The name of the file to upload.
2929
* @param fileContent - The content of the file to upload as a Buffer.
30+
* @param uploadStrategy - The upload strategy to use (standard or concurrent).
31+
* @param logger - Optional logger for logging messages.
32+
* @param signal - Optional AbortSignal to handle request cancellation.
3033
* @returns A promise that resolves when the file is successfully uploaded.
3134
*/
3235
async function uploadSFTP(
3336
settings: Settings,
3437
sftpFolderPath: string,
3538
filename: string,
3639
fileContent: Buffer,
37-
useConcurrentWrites?: boolean,
3840
logger?: Logger,
3941
signal?: AbortSignal
4042
) {
4143
const sftp = new SFTPWrapper('uploadSFTP', logger)
44+
4245
signal?.throwIfAborted() // exit early if already aborted
4346
// Set up abort listener to clean up SFTP connection on abort
4447
const abortListener = () => {
@@ -48,10 +51,11 @@ async function uploadSFTP(
4851
throw new RequestTimeoutError()
4952
}
5053
signal?.addEventListener('abort', abortListener, { once: true })
54+
5155
try {
5256
await sftp.connect(createConnectionConfig(settings))
5357
const remoteFilePath = path.posix.join(sftpFolderPath, filename)
54-
if (useConcurrentWrites) {
58+
if (settings.uploadStrategy === UploadStrategy.CONCURRENT) {
5559
return await sftp.fastPutFromBuffer(fileContent, remoteFilePath)
5660
} else {
5761
return await sftp.put(fileContent, remoteFilePath)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
11
export const SFTP_DEFAULT_PORT = 22
2+
3+
export const UploadStrategy = {
4+
STANDARD: 'standard',
5+
CONCURRENT: 'concurrent'
6+
}

packages/destination-actions/src/destinations/sftp/fields.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,7 @@ const audienceFields: Record<string, InputField> = {
245245
traits_or_props,
246246
computation_key
247247
}
248-
const useConcurrentWrites: InputField = {
249-
label: 'Use Concurrent Writes',
250-
description:
251-
'Enable concurrent writes when uploading files to SFTP. This can improve upload performance for large files. However, it may not be supported by all SFTP servers. Test with your server for compatibility.',
252-
type: 'boolean',
253-
required: false,
254-
default: false
255-
}
248+
256249
export const baseFields: Record<string, InputField> = {
257250
columns: columnsNoDefaultMappings,
258251
filename_prefix,
@@ -261,8 +254,7 @@ export const baseFields: Record<string, InputField> = {
261254
sftp_folder_path,
262255
enable_batching,
263256
batch_size,
264-
batch_size_column_name,
265-
useConcurrentWrites
257+
batch_size_column_name
266258
}
267259
export const commonFields: Record<string, InputField> = {
268260
...baseFields,

packages/destination-actions/src/destinations/sftp/functions.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ async function send(
1818
batch_size_column_name,
1919
filename_prefix,
2020
file_extension,
21-
sftp_folder_path,
22-
useConcurrentWrites
21+
sftp_folder_path
2322
} = payloads[0]
2423

2524
const headers: ColumnHeader[] = createHeaders(
@@ -41,7 +40,7 @@ async function send(
4140

4241
const msResponse = new MultiStatusResponse()
4342
try {
44-
await uploadSFTP(settings, sftp_folder_path, filename, fileContent, useConcurrentWrites, logger, signal)
43+
await uploadSFTP(settings, sftp_folder_path, filename, fileContent, logger, signal)
4544
payloads.forEach((payload, index) => {
4645
const row = rowsObservabilityArray[index] ?? ''
4746
msResponse.setSuccessResponseAtIndex(index, {

packages/destination-actions/src/destinations/sftp/generated-types.ts

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/destination-actions/src/destinations/sftp/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { DestinationDefinition } from '@segment/actions-core'
22
import type { Settings } from './generated-types'
33

44
import { testSFTPConnection } from './client'
5-
import { SFTP_DEFAULT_PORT } from './constants'
5+
import { SFTP_DEFAULT_PORT, UploadStrategy } from './constants'
66
import syncEvents from './syncEvents'
77
import syncModelToSFTP from './syncModelToSFTP'
88

@@ -89,6 +89,21 @@ const destination: DestinationDefinition<Settings> = {
8989
}
9090
]
9191
}
92+
},
93+
uploadStrategy: {
94+
label: 'Upload Strategy',
95+
description: 'Select the upload strategy to use when uploading files to the SFTP server.',
96+
type: 'string',
97+
choices: [
98+
{ label: 'Standard [Supported by all SFTP servers]', value: UploadStrategy.STANDARD },
99+
{
100+
label:
101+
'Concurrent [Improves upload time but may not be supported by all SFTP servers. Test and enable if supported.]',
102+
value: UploadStrategy.CONCURRENT
103+
}
104+
],
105+
default: UploadStrategy.STANDARD,
106+
required: false
92107
}
93108
},
94109
testAuthentication: async (_, { settings }) => await testSFTPConnection(settings)

packages/destination-actions/src/destinations/sftp/sftp-wrapper.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export class SFTPWrapper {
6868
): Promise<void> {
6969
const fsize = input.length
7070
return new Promise<void>((resolve, reject) => {
71+
// Open the connection to the remote file
7172
this.client?.open(remoteFilePath, 'w', (err, handle) => {
7273
if (err) {
7374
return reject(new Error(`Error opening remote file: ${err.message}`))
@@ -78,6 +79,7 @@ export class SFTPWrapper {
7879
let position = 0
7980
let writeRequests: Promise<void>[] = []
8081

82+
// function that writes a chunk to the remote file
8183
const writeChunk = (chunkPos: number): Promise<void> => {
8284
return new Promise((chunkResolve, chunkReject) => {
8385
const bytesToWrite = Math.min(chunkSize, fsize - chunkPos)
@@ -93,6 +95,7 @@ export class SFTPWrapper {
9395
})
9496
}
9597

98+
// function that splits the writes into concurrent requests and processes them
9699
const processWrites = async () => {
97100
while (position < fsize) {
98101
writeRequests.push(writeChunk(position))
@@ -121,6 +124,7 @@ export class SFTPWrapper {
121124
}
122125

123126
async end() {
127+
// Close the SFTP connection
124128
return this.sftp.end()
125129
}
126130
}

packages/destination-actions/src/destinations/sftp/syncEvents/generated-types.ts

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)