Skip to content

Commit b3cf961

Browse files
Optimize bandwidth aggregation with out-of-order processing and dynamic allocation (#21)
1 parent 880a0d0 commit b3cf961

File tree

9 files changed

+331
-56
lines changed

9 files changed

+331
-56
lines changed

client/recv.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ func (svc *Service) recvFile(id string, filePath string) error {
8585
}
8686

8787
recv := receiver.NewReceiver(0, fio.NewCallbackWriter(f, callback))
88+
89+
if svc.enableUnorderedProcessing {
90+
if svc.debugMode {
91+
fmt.Printf("Enabling unordered frame processing with buffer size: %d\n", svc.bufferSize)
92+
}
93+
recv.EnableUnorderedProcessing(svc.bufferSize)
94+
}
95+
8896
for _, worker := range m.Workers {
8997
wait.Add(1)
9098
go func(addr string) {

client/send.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ func (svc *Service) sendFile(id string, filePath string) error {
8888
return err
8989
}
9090

91+
if svc.dynamicAllocation {
92+
if svc.debugMode {
93+
fmt.Println("Enabling dynamic frame allocation")
94+
}
95+
s.EnableDynamicAllocation()
96+
}
97+
9198
for _, worker := range m.Workers {
9299
wait.Add(1)
93100
go func(addr string) {

client/service.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ import (
55
)
66

77
type Options struct {
8-
ServerAddr string
9-
ID string
10-
SendFile string
11-
FrameSize int
12-
CacheCount int
13-
RecvFile string
14-
DebugMode bool
8+
ServerAddr string
9+
ID string
10+
SendFile string
11+
FrameSize int
12+
CacheCount int
13+
RecvFile string
14+
DebugMode bool
15+
EnableUnorderedProcessing bool
16+
DynamicAllocation bool
17+
BufferSize int
1518
}
1619

1720
func (op *Options) Check() error {
@@ -32,10 +35,13 @@ func (op *Options) Check() error {
3235
}
3336

3437
type Service struct {
35-
debugMode bool
36-
serverAddr string
37-
frameSize int
38-
cacheCount int
38+
debugMode bool
39+
serverAddr string
40+
frameSize int
41+
cacheCount int
42+
enableUnorderedProcessing bool
43+
dynamicAllocation bool
44+
bufferSize int
3945

4046
runHandler func() error
4147
}
@@ -46,10 +52,13 @@ func NewService(options Options) (*Service, error) {
4652
}
4753

4854
svc := &Service{
49-
debugMode: options.DebugMode,
50-
serverAddr: options.ServerAddr,
51-
frameSize: options.FrameSize,
52-
cacheCount: options.CacheCount,
55+
debugMode: options.DebugMode,
56+
serverAddr: options.ServerAddr,
57+
frameSize: options.FrameSize,
58+
cacheCount: options.CacheCount,
59+
enableUnorderedProcessing: options.EnableUnorderedProcessing,
60+
dynamicAllocation: options.DynamicAllocation,
61+
bufferSize: options.BufferSize,
5362
}
5463

5564
if options.SendFile != "" {

cmd/bandwidth-test/root.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ import (
1717
)
1818

1919
var (
20-
showVersion bool
21-
fileSize int64
22-
duration int
23-
tempDir string
24-
workers string
25-
verbose bool // Whether to show detailed output from external processes
20+
showVersion bool
21+
fileSize int64
22+
duration int
23+
tempDir string
24+
workers string
25+
verbose bool // Whether to show detailed output from external processes
26+
enableUnorderedProcessing bool // Enable out-of-order frame processing
27+
dynamicAllocation bool // Enable dynamic frame allocation
28+
bufferSize int // Maximum buffer size for out-of-order frames
2629
)
2730

2831
func init() {
@@ -32,6 +35,10 @@ func init() {
3235
rootCmd.PersistentFlags().StringVarP(&tempDir, "temp-dir", "t", os.TempDir(), "directory to store temporary files")
3336
rootCmd.PersistentFlags().StringVarP(&workers, "workers", "w", "100KB,500KB", "worker bandwidth configuration, comma-separated list of bandwidth limits (e.g., '200KB' for one worker, '200KB,200KB,300KB' for three workers)")
3437
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "", false, "show detailed output from external processes")
38+
39+
rootCmd.PersistentFlags().BoolVar(&enableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers")
40+
rootCmd.PersistentFlags().BoolVar(&dynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance")
41+
rootCmd.PersistentFlags().IntVar(&bufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)")
3542
}
3643

3744
var rootCmd = &cobra.Command{
@@ -229,6 +236,11 @@ func startSender(serverAddr, transferID, testFilePath string) (*Process, chan er
229236
senderArgs = append(senderArgs, "--debug")
230237
}
231238

239+
if dynamicAllocation {
240+
senderArgs = append(senderArgs, "--dynamic-allocation")
241+
fmt.Println("Dynamic frame allocation enabled")
242+
}
243+
232244
senderProcess := NewProcess("Sender", fftPath, senderArgs, verbose)
233245
fmt.Println("Starting sender...")
234246

@@ -276,6 +288,14 @@ func startReceiver(serverAddr, transferID, recvDir string, fileSizeBytes int64)
276288
receiverArgs = append(receiverArgs, "--debug")
277289
}
278290

291+
if enableUnorderedProcessing {
292+
receiverArgs = append(receiverArgs, "--enable-unordered-processing")
293+
if bufferSize > 0 {
294+
receiverArgs = append(receiverArgs, "--buffer-size", fmt.Sprintf("%d", bufferSize))
295+
}
296+
fmt.Printf("Unordered frame processing enabled with buffer size: %d\n", bufferSize)
297+
}
298+
279299
receiverProcess := NewProcess("Receiver", fftPath, receiverArgs, verbose)
280300
fmt.Println("Starting receiver...")
281301

cmd/fft/root.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ func init() {
2424
rootCmd.PersistentFlags().IntVarP(&options.CacheCount, "cache-count", "c", 512, "how many frames be cached, it will be set to the min value between sender and receiver")
2525
rootCmd.PersistentFlags().StringVarP(&options.RecvFile, "recv-file", "t", "", "specify local file path to store received file")
2626
rootCmd.PersistentFlags().BoolVarP(&options.DebugMode, "debug", "g", false, "print more debug info")
27+
28+
rootCmd.PersistentFlags().BoolVar(&options.EnableUnorderedProcessing, "enable-unordered-processing", false, "enable out-of-order frame processing to improve efficiency with unbalanced workers")
29+
rootCmd.PersistentFlags().BoolVar(&options.DynamicAllocation, "dynamic-allocation", false, "enable dynamic frame allocation based on worker performance")
30+
rootCmd.PersistentFlags().IntVar(&options.BufferSize, "buffer-size", 1000, "maximum buffer size for out-of-order frames (only used with --enable-unordered-processing)")
2731
}
2832

2933
var rootCmd = &cobra.Command{

pkg/receiver/receiver.go

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,40 @@ type Receiver struct {
1717
framesIDMap map[uint32]struct{}
1818
notifyCh chan struct{}
1919

20+
unorderedEnabled bool
21+
orderedBuffer map[uint32]*stream.Frame // Buffer for out-of-order frames
22+
maxBufferSize int // Maximum number of frames to buffer
23+
2024
mu sync.RWMutex
2125
}
2226

2327
func NewReceiver(fileID uint32, dst io.Writer) *Receiver {
2428
return &Receiver{
25-
fileID: fileID,
26-
nextFrameID: 0,
27-
dst: dst,
28-
frames: make([]*stream.Frame, 0),
29-
framesIDMap: make(map[uint32]struct{}),
30-
notifyCh: make(chan struct{}, 1),
29+
fileID: fileID,
30+
nextFrameID: 0,
31+
dst: dst,
32+
frames: make([]*stream.Frame, 0),
33+
framesIDMap: make(map[uint32]struct{}),
34+
notifyCh: make(chan struct{}, 1),
35+
unorderedEnabled: false,
36+
orderedBuffer: make(map[uint32]*stream.Frame),
37+
maxBufferSize: 1000, // Default buffer size
38+
}
39+
}
40+
41+
func (r *Receiver) EnableUnorderedProcessing(maxBufferSize int) {
42+
r.mu.Lock()
43+
defer r.mu.Unlock()
44+
45+
r.unorderedEnabled = true
46+
if maxBufferSize > 0 {
47+
r.maxBufferSize = maxBufferSize
3148
}
3249
}
3350

3451
func (r *Receiver) RecvFrame(frame *stream.Frame) {
3552
r.mu.Lock()
53+
3654
if frame.FrameID < r.nextFrameID {
3755
r.mu.Unlock()
3856
return
@@ -43,11 +61,20 @@ func (r *Receiver) RecvFrame(frame *stream.Frame) {
4361
return
4462
}
4563

46-
r.frames = append(r.frames, frame)
4764
r.framesIDMap[frame.FrameID] = struct{}{}
48-
sort.Slice(r.frames, func(i, j int) bool {
49-
return r.frames[i].FrameID < r.frames[j].FrameID
50-
})
65+
66+
if r.unorderedEnabled {
67+
if frame.FrameID == r.nextFrameID {
68+
r.frames = append(r.frames, frame)
69+
} else {
70+
r.orderedBuffer[frame.FrameID] = frame
71+
}
72+
} else {
73+
r.frames = append(r.frames, frame)
74+
sort.Slice(r.frames, func(i, j int) bool {
75+
return r.frames[i].FrameID < r.frames[j].FrameID
76+
})
77+
}
5178
r.mu.Unlock()
5279

5380
select {
@@ -67,6 +94,7 @@ func (r *Receiver) Run() {
6794
ii := 0
6895
finished := false
6996
r.mu.Lock()
97+
7098
for i, frame := range r.frames {
7199
if r.nextFrameID == frame.FrameID {
72100
ii = i + 1
@@ -85,6 +113,27 @@ func (r *Receiver) Run() {
85113
}
86114
}
87115
r.frames = r.frames[ii:]
116+
117+
if r.unorderedEnabled {
118+
continueProcessing := true
119+
for continueProcessing {
120+
if frame, ok := r.orderedBuffer[r.nextFrameID]; ok {
121+
delete(r.orderedBuffer, r.nextFrameID)
122+
delete(r.framesIDMap, r.nextFrameID)
123+
124+
// Check if it's the last frame
125+
if len(frame.Buf) == 0 {
126+
finished = true
127+
break
128+
}
129+
130+
buffer.Write(frame.Buf)
131+
r.nextFrameID++
132+
} else {
133+
continueProcessing = false
134+
}
135+
}
136+
}
88137
r.mu.Unlock()
89138

90139
buf := buffer.Bytes()

pkg/sender/frame.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ type SendFrame struct {
1313
sendTime time.Time
1414
retryTimes int
1515
hasAck bool
16+
transferID int // ID of the transfer this frame is assigned to
1617

1718
mu sync.Mutex
1819
}
1920

2021
func NewSendFrame(frame *stream.Frame) *SendFrame {
2122
return &SendFrame{
22-
frame: frame,
23+
frame: frame,
24+
transferID: -1, // -1 means not assigned to any specific transfer
2325
}
2426
}
2527

@@ -44,3 +46,11 @@ func (sf *SendFrame) HasAck() bool {
4446
func (sf *SendFrame) SetAck() {
4547
sf.hasAck = true
4648
}
49+
50+
func (sf *SendFrame) SetTransferID(id int) {
51+
sf.transferID = id
52+
}
53+
54+
func (sf *SendFrame) GetTransferID() int {
55+
return sf.transferID
56+
}

0 commit comments

Comments
 (0)