Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DS: refactor memory control module #993

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
log.Error("failed to create new dispatcher manager",
zap.Any("changefeedID", cfId.Name()), zap.Error(err))

appcontext.GetService[*dispatchermanager.HeartBeatCollector](appcontext.HeartbeatCollector).RemoveEventDispatcherManager(manager)

response := &heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: req.ChangefeedID,
Err: &heartbeatpb.RunningError{
Expand Down
29 changes: 19 additions & 10 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,14 @@ func (c *EventCollector) processFeedback(ctx context.Context) {
case <-ctx.Done():
return
case feedback := <-c.ds.Feedback():
if feedback.IsAreaFeedback() {
if feedback.IsPauseArea() {
feedback.Dest.pauseChangefeed(c)
} else {
feedback.Dest.resumeChangefeed(c)
}
}

if feedback.IsPausePath() {
switch feedback.FeedbackType {
case dynstream.PauseArea:
feedback.Dest.pauseChangefeed(c)
case dynstream.ResumeArea:
feedback.Dest.resumeChangefeed(c)
case dynstream.PausePath:
feedback.Dest.pauseDispatcher(c)
} else {
case dynstream.ResumePath:
feedback.Dest.resumeDispatcher(c)
}
}
Expand Down Expand Up @@ -665,6 +662,9 @@ func (d *dispatcherStat) pauseChangefeed(eventCollector *EventCollector) {
defer d.eventServiceInfo.RUnlock()

if d.eventServiceInfo.serverID == "" || !d.eventServiceInfo.readyEventReceived {
log.Info("ignore pause changefeed request because the eventService is not ready",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Any("eventServiceID", d.eventServiceInfo.serverID))
// Just ignore the request if the dispatcher is not ready.
return
}
Expand All @@ -685,6 +685,9 @@ func (d *dispatcherStat) resumeChangefeed(eventCollector *EventCollector) {
defer d.eventServiceInfo.RUnlock()

if d.eventServiceInfo.serverID == "" || !d.eventServiceInfo.readyEventReceived {
log.Info("ignore resume changefeed request because the eventService is not ready",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Any("eventServiceID", d.eventServiceInfo.serverID))
// Just ignore the request if the dispatcher is not ready.
return
}
Expand All @@ -704,6 +707,9 @@ func (d *dispatcherStat) pauseDispatcher(eventCollector *EventCollector) {
defer d.eventServiceInfo.RUnlock()

if d.eventServiceInfo.serverID == "" || !d.eventServiceInfo.readyEventReceived {
log.Info("ignore pause dispatcher request because the eventService is not ready",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Any("eventServiceID", d.eventServiceInfo.serverID))
// Just ignore the request if the dispatcher is not ready.
return
}
Expand All @@ -719,6 +725,9 @@ func (d *dispatcherStat) resumeDispatcher(eventCollector *EventCollector) {
defer d.eventServiceInfo.RUnlock()

if d.eventServiceInfo.serverID == "" || !d.eventServiceInfo.readyEventReceived {
log.Info("ignore resume dispatcher request because the eventService is not ready",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Any("eventServiceID", d.eventServiceInfo.serverID))
// Just ignore the request if the dispatcher is not ready.
return
}
Expand Down
43 changes: 26 additions & 17 deletions utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,31 +260,40 @@ func NewAreaSettingsWithMaxPendingSize(size int) AreaSettings {
}
}

type Feedback[A Area, P Path, D Dest] struct {
Area A
Path P
Dest D
type FeedbackType int

FeedbackType int // 0: path feedback, 1: area feedback

PausePath bool // Pause or resume the path.
PauseArea bool // Pause or resume the area.
}
const (
PausePath FeedbackType = iota
ResumePath
PauseArea
ResumeArea
)

func (f *Feedback[A, P, D]) IsAreaFeedback() bool {
return f.FeedbackType == 1
func (f FeedbackType) String() string {
switch f {
case PausePath:
return "PausePath"
case ResumePath:
return "ResumePath"
case PauseArea:
return "PauseArea"
case ResumeArea:
return "ResumeArea"
default:
return fmt.Sprintf("Unknown FeedbackType: %d", f)
}
}

func (f *Feedback[A, P, D]) IsPausePath() bool {
return f.PausePath
}
type Feedback[A Area, P Path, D Dest] struct {
Area A
Path P
Dest D

func (f *Feedback[A, P, D]) IsPauseArea() bool {
return f.PauseArea
FeedbackType FeedbackType
}

func (f *Feedback[A, P, D]) String() string {
return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v, PauseArea: %v}", f.Area, f.Path, f.PausePath, f.PauseArea)
return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, FeedbackType: %s}", f.Area, f.Path, f.FeedbackType.String())
}

func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] {
Expand Down
97 changes: 65 additions & 32 deletions utils/dynstream/memory_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newAreaMemStat[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](
func (as *areaMemStat[A, P, T, D, H]) appendEvent(
path *pathInfo[A, P, T, D, H],
event eventWrap[A, P, T, D, H],
handler H,
_ H,
) bool {
defer as.updatePathPauseState(path)
defer as.updateAreaPauseState(path)
Expand Down Expand Up @@ -91,77 +91,110 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent(
// It needs to be called after a event is appended.
// Note: Our gaol is to fast pause, and lazy resume.
func (as *areaMemStat[A, P, T, D, H]) updatePathPauseState(path *pathInfo[A, P, T, D, H]) {
shouldPause := as.shouldPausePath(path)
pause, resume := as.shouldPausePath(path)

sendFeedback := func(pause bool) {
if !(time.Since(path.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval) {
return
}

feedbackType := PausePath
if !pause {
feedbackType = ResumePath
}

as.feedbackChan <- Feedback[A, P, D]{
Area: path.area,
Path: path.path,
Dest: path.dest,
FeedbackType: 0,
PausePath: pause,
FeedbackType: feedbackType,
}
path.lastSendFeedbackTime.Store(time.Now())
path.paused.Store(pause)
}

// If the path is not paused previously but should be paused, we need to pause it.
// And send pause feedback.
if path.paused.Load() != shouldPause &&
time.Since(path.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval {
path.paused.Store(shouldPause)
sendFeedback(shouldPause)
switch {
case pause:
sendFeedback(true)
case resume:
sendFeedback(false)
}
}

func (as *areaMemStat[A, P, T, D, H]) updateAreaPauseState(path *pathInfo[A, P, T, D, H]) {
shouldPause := as.shouldPauseArea()
pause, resume := as.shouldPauseArea()

sendFeedback := func(pause bool) {
feedbackType := PauseArea
if !pause {
feedbackType = ResumeArea
}

if !(time.Since(as.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval) {
return
}
as.feedbackChan <- Feedback[A, P, D]{
Area: as.area,
Path: path.path,
Dest: path.dest,
PauseArea: pause,
FeedbackType: 1,
FeedbackType: feedbackType,
}

as.lastSendFeedbackTime.Store(time.Now())
as.paused.Store(pause)
}

prevPaused := as.paused.Load()
if prevPaused != shouldPause &&
time.Since(as.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval {
as.paused.Store(shouldPause)
sendFeedback(shouldPause)
return
switch {
case pause:
sendFeedback(true)
case resume:
sendFeedback(false)
}
}

// shouldPausePath determines if a path should be paused based on memory usage.
// If the memory usage is greater than the 20% of max pending size, the path should be paused.
func (as *areaMemStat[A, P, T, D, H]) shouldPausePath(path *pathInfo[A, P, T, D, H]) bool {
func (as *areaMemStat[A, P, T, D, H]) shouldPausePath(path *pathInfo[A, P, T, D, H]) (pause bool, resume bool) {
memoryUsageRatio := float64(path.pendingSize.Load()) / float64(as.settings.Load().MaxPendingSize)

// If the path is paused, we only need to resume it when the memory usage is less than 10%.
if path.paused.Load() {
return memoryUsageRatio >= 0.1
switch {
case path.paused.Load():
// If the path is paused, we only need to resume it when the memory usage is less than 10%.
if memoryUsageRatio < 0.1 {
log.Info("resume path", zap.Any("area", as.area), zap.Any("path", path.path), zap.Float64("memoryUsageRatio", memoryUsageRatio))
resume = true
}
default:
// If the path is not paused, we need to pause it when the memory usage is greater than 20% of max pending size.
if memoryUsageRatio >= 0.2 {
log.Info("pause path", zap.Any("area", as.area), zap.Any("path", path.path), zap.Float64("memoryUsageRatio", memoryUsageRatio))
pause = true
}
}

// If the path is not paused, we need to pause it when the memory usage is greater than 20% of max pending size.
return memoryUsageRatio >= 0.2
return
}

// shouldPauseArea determines if the area should be paused based on memory usage.
// If the memory usage is greater than the 80% of max pending size, the area should be paused.
func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() bool {
func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool) {
memoryUsageRatio := float64(as.totalPendingSize.Load()) / float64(as.settings.Load().MaxPendingSize)

// If the area is paused, we only need to resume it when the memory usage is less than 50%.
if as.paused.Load() {
return memoryUsageRatio >= 0.5
switch {
case as.paused.Load():
// If the area is already paused, we need to resume it when the memory usage is less than 50%.
if memoryUsageRatio < 0.5 {
resume = true
log.Info("resume area", zap.Any("area", as.area), zap.Float64("memoryUsageRatio", memoryUsageRatio))
}
default:
// If the area is not paused, we need to pause it when the memory usage is greater than 80% of max pending size.
if memoryUsageRatio >= 0.8 {
pause = true
log.Info("pause area", zap.Any("area", as.area), zap.Float64("memoryUsageRatio", memoryUsageRatio))
}
}

// If the area is not paused, we need to pause it when the memory usage is greater than 80% of max pending size.
return memoryUsageRatio >= 0.8
return
}

func (as *areaMemStat[A, P, T, D, H]) decPendingSize(size int64) {
Expand Down
Loading
Loading