Skip to content

Commit d326656

Browse files
authored
reduce PutRetentionPolicy calls by checking existing policy (#1545)
1 parent 5353c10 commit d326656

File tree

5 files changed

+270
-39
lines changed

5 files changed

+270
-39
lines changed

plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type stubLogsService struct {
3030
clg func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
3131
cls func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
3232
prp func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error)
33+
dlg func(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
3334
}
3435

3536
func (s *stubLogsService) PutLogEvents(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
@@ -60,6 +61,13 @@ func (s *stubLogsService) PutRetentionPolicy(in *cloudwatchlogs.PutRetentionPoli
6061
return nil, nil
6162
}
6263

64+
func (s *stubLogsService) DescribeLogGroups(in *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
65+
if s.dlg != nil {
66+
return s.dlg(in)
67+
}
68+
return nil, nil
69+
}
70+
6371
func TestAddSingleEvent_WithAccountId(t *testing.T) {
6472
t.Parallel()
6573
var wg sync.WaitGroup

plugins/outputs/cloudwatchlogs/internal/pusher/sender.go

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type cloudWatchLogsService interface {
1919
CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
2020
CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
2121
PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error)
22+
DescribeLogGroups(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
2223
}
2324

2425
type Sender interface {

plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ func (m *mockLogsService) PutRetentionPolicy(input *cloudwatchlogs.PutRetentionP
4040
return args.Get(0).(*cloudwatchlogs.PutRetentionPolicyOutput), args.Error(1)
4141
}
4242

43+
func (m *mockLogsService) DescribeLogGroups(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
44+
args := m.Called(input)
45+
return args.Get(0).(*cloudwatchlogs.DescribeLogGroupsOutput), args.Error(1)
46+
}
47+
4348
type mockTargetManager struct {
4449
mock.Mock
4550
}

plugins/outputs/cloudwatchlogs/internal/pusher/target.go

+136-32
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
package pusher
55

66
import (
7+
"fmt"
78
"sync"
9+
"time"
810

911
"github.com/aws/aws-sdk-go/aws"
1012
"github.com/aws/aws-sdk-go/aws/awserr"
@@ -13,6 +15,15 @@ import (
1315
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
1416
)
1517

18+
const (
19+
retentionChannelSize = 100
20+
// max wait time with backoff and jittering:
21+
// 0 + 2.4 + 4.8 + 9.6 + 10 ~= 26.8 sec
22+
baseRetryDelay = 1 * time.Second
23+
maxRetryDelayTarget = 10 * time.Second
24+
numBackoffRetries = 5
25+
)
26+
1627
type Target struct {
1728
Group, Stream, Class string
1829
Retention int
@@ -29,69 +40,100 @@ type targetManager struct {
2940
// cache of initialized targets
3041
cache map[Target]struct{}
3142
mu sync.Mutex
43+
dlg chan Target
44+
prp chan Target
3245
}
3346

3447
func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager {
35-
return &targetManager{
48+
tm := &targetManager{
3649
logger: logger,
3750
service: service,
3851
cache: make(map[Target]struct{}),
52+
dlg: make(chan Target, retentionChannelSize),
53+
prp: make(chan Target, retentionChannelSize),
3954
}
55+
56+
go tm.processDescribeLogGroup()
57+
go tm.processPutRetentionPolicy()
58+
return tm
4059
}
4160

4261
// InitTarget initializes a Target if it hasn't been initialized before.
4362
func (m *targetManager) InitTarget(target Target) error {
4463
m.mu.Lock()
4564
defer m.mu.Unlock()
4665
if _, ok := m.cache[target]; !ok {
47-
err := m.createLogGroupAndStream(target)
66+
newGroup, err := m.createLogGroupAndStream(target)
4867
if err != nil {
4968
return err
5069
}
51-
m.PutRetentionPolicy(target)
70+
if target.Retention > 0 {
71+
if newGroup {
72+
m.logger.Debugf("sending new log group %v to prp channel", target.Group)
73+
m.prp <- target
74+
} else {
75+
m.logger.Debugf("sending existing log group %v to dlg channel", target.Group)
76+
m.dlg <- target
77+
}
78+
}
5279
m.cache[target] = struct{}{}
5380
}
5481
return nil
5582
}
5683

57-
func (m *targetManager) createLogGroupAndStream(t Target) error {
84+
func (m *targetManager) PutRetentionPolicy(target Target) {
85+
// new pusher will call this so start with dlg
86+
if target.Retention > 0 {
87+
m.logger.Debugf("sending log group %v to dlg channel by pusher", target.Group)
88+
m.dlg <- target
89+
}
90+
}
91+
92+
func (m *targetManager) createLogGroupAndStream(t Target) (bool, error) {
5893
err := m.createLogStream(t)
5994
if err == nil {
60-
return nil
95+
return false, nil
6196
}
6297

6398
m.logger.Debugf("creating stream fail due to : %v", err)
99+
newGroup := false
64100
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
65101
err = m.createLogGroup(t)
102+
newGroup = true
66103

67104
// attempt to create stream again if group created successfully.
68105
if err == nil {
69-
m.logger.Debugf("successfully created log group %v. Retrying log stream %v", t.Group, t.Stream)
106+
m.logger.Debugf("retrying log stream %v", t.Stream)
70107
err = m.createLogStream(t)
71108
} else {
72109
m.logger.Debugf("creating group fail due to : %v", err)
73110
}
74111
}
75112

76113
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
77-
m.logger.Debugf("Resource was already created. %v\n", err)
78-
return nil // if the log group or log stream already exist, this is not worth returning an error for
114+
m.logger.Debugf("resource was already created. %v\n", err)
115+
return false, nil
79116
}
80117

81-
return err
118+
return newGroup, err
82119
}
83120

84121
func (m *targetManager) createLogGroup(t Target) error {
85-
var err error
122+
var input *cloudwatchlogs.CreateLogGroupInput
86123
if t.Class != "" {
87-
_, err = m.service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
124+
input = &cloudwatchlogs.CreateLogGroupInput{
88125
LogGroupName: &t.Group,
89126
LogGroupClass: &t.Class,
90-
})
127+
}
91128
} else {
92-
_, err = m.service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
129+
input = &cloudwatchlogs.CreateLogGroupInput{
93130
LogGroupName: &t.Group,
94-
})
131+
}
132+
}
133+
_, err := m.service.CreateLogGroup(input)
134+
if err == nil {
135+
m.logger.Debugf("successfully created log group %v", t.Group)
136+
return nil
95137
}
96138
return err
97139
}
@@ -109,26 +151,88 @@ func (m *targetManager) createLogStream(t Target) error {
109151
return err
110152
}
111153

112-
// PutRetentionPolicy tries to set the retention policy for a log group. Does not retry on failure.
113-
func (m *targetManager) PutRetentionPolicy(t Target) {
114-
if t.Retention > 0 {
115-
i := aws.Int64(int64(t.Retention))
116-
putRetentionInput := &cloudwatchlogs.PutRetentionPolicyInput{
117-
LogGroupName: &t.Group,
118-
RetentionInDays: i,
154+
func (m *targetManager) processDescribeLogGroup() {
155+
for target := range m.dlg {
156+
for attempt := 0; attempt < numBackoffRetries; attempt++ {
157+
currentRetention, err := m.getRetention(target)
158+
if err != nil {
159+
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
160+
time.Sleep(m.calculateBackoff(attempt))
161+
continue
162+
}
163+
164+
if currentRetention != target.Retention && target.Retention > 0 {
165+
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
166+
m.prp <- target
167+
}
168+
break // no change in retention
119169
}
120-
_, err := m.service.PutRetentionPolicy(putRetentionInput)
121-
if err != nil {
122-
// since this gets called both before we start pushing logs, and after we first attempt
123-
// to push a log to a non-existent log group, we don't want to dirty the log with an error
124-
// if the error is that the log group doesn't exist (yet).
125-
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
126-
m.logger.Debugf("Log group %v not created yet: %v", t.Group, err)
127-
} else {
128-
m.logger.Errorf("Unable to put retention policy for log group %v: %v ", t.Group, err)
170+
}
171+
}
172+
173+
func (m *targetManager) getRetention(target Target) (int, error) {
174+
input := &cloudwatchlogs.DescribeLogGroupsInput{
175+
LogGroupNamePrefix: aws.String(target.Group),
176+
}
177+
178+
output, err := m.service.DescribeLogGroups(input)
179+
if err != nil {
180+
return 0, fmt.Errorf("describe log groups failed: %w", err)
181+
}
182+
183+
for _, group := range output.LogGroups {
184+
if *group.LogGroupName == target.Group {
185+
if group.RetentionInDays == nil {
186+
return 0, nil
129187
}
130-
} else {
131-
m.logger.Debugf("successfully updated log retention policy for log group %v", t.Group)
188+
return int(*group.RetentionInDays), nil
189+
}
190+
}
191+
192+
return 0, fmt.Errorf("log group %v not found", target.Group)
193+
}
194+
195+
func (m *targetManager) processPutRetentionPolicy() {
196+
for target := range m.prp {
197+
var updated bool
198+
for attempt := 0; attempt < numBackoffRetries; attempt++ {
199+
err := m.updateRetentionPolicy(target)
200+
if err == nil {
201+
updated = true
202+
break
203+
}
204+
205+
m.logger.Debugf("retrying to update retention policy for target (%v) %v: %v", attempt, target, err)
206+
time.Sleep(m.calculateBackoff(attempt))
132207
}
208+
209+
if !updated {
210+
m.logger.Errorf("failed to update retention policy for target %v after %d attempts", target, numBackoffRetries)
211+
}
212+
}
213+
}
214+
215+
func (m *targetManager) updateRetentionPolicy(target Target) error {
216+
input := &cloudwatchlogs.PutRetentionPolicyInput{
217+
LogGroupName: aws.String(target.Group),
218+
RetentionInDays: aws.Int64(int64(target.Retention)),
219+
}
220+
221+
_, err := m.service.PutRetentionPolicy(input)
222+
if err != nil {
223+
return fmt.Errorf("put retention policy failed: %w", err)
224+
}
225+
m.logger.Debugf("successfully updated retention policy for log group %v", target.Group)
226+
return nil
227+
}
228+
229+
func (m *targetManager) calculateBackoff(retryCount int) time.Duration {
230+
delay := baseRetryDelay
231+
if retryCount < numBackoffRetries {
232+
delay = baseRetryDelay * time.Duration(1<<int64(retryCount))
233+
}
234+
if delay > maxRetryDelayTarget {
235+
delay = maxRetryDelayTarget
133236
}
237+
return time.Duration(seededRand.Int63n(int64(delay/2)) + int64(delay/2))
134238
}

0 commit comments

Comments
 (0)