Skip to content

Commit 7fd415d

Browse files
710leolwb0214Reditiny
authored
feat: support notify rule (#2500)
Co-authored-by: flashbo <[email protected]> Co-authored-by: Xu Bin <[email protected]>
1 parent f7401b7 commit 7fd415d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5031
-122
lines changed

alert/alert.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
6464
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
6565
taskTplsCache := memsto.NewTaskTplCache(ctx)
6666
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
67+
notifyRuleCache := memsto.NewNotifyRuleCache(ctx, syncStats)
68+
notifyChannelCache := memsto.NewNotifyChannelCache(ctx, syncStats)
69+
messageTemplateCache := memsto.NewMessageTemplateCache(ctx, syncStats)
6770

6871
promClients := prom.NewPromClient(ctx)
6972
dispatch.InitRegisterQueryFunc(promClients)
@@ -72,7 +75,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
7275

7376
macros.RegisterMacro(macros.MacroInVain)
7477
dscache.Init(ctx, false)
75-
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache)
78+
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache, notifyRuleCache, notifyChannelCache, messageTemplateCache)
7679

7780
r := httpx.GinEngine(config.Global.RunMode, config.HTTP,
7881
configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
@@ -95,12 +98,14 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
9598

9699
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
97100
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, taskTplsCache *memsto.TaskTplCache, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
98-
promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
101+
promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType, messageTemplateCache *memsto.MessageTemplateCacheType) {
99102
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
100103
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
101104
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
102105

103106
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
107+
go models.InitNotifyChannel(ctx)
108+
go models.InitMessageTemplate(ctx)
104109

105110
naming := naming.NewNaming(ctx, alertc.Heartbeat, alertStats)
106111

@@ -110,7 +115,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
110115
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
111116
busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
112117

113-
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, alertc.Alerting, ctx, alertStats)
118+
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, alertc.Alerting, ctx, alertStats)
114119
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)
115120

116121
notifyRecordComsumer := sender.NewNotifyRecordConsumer(ctx)

alert/dispatch/dispatch.go

Lines changed: 244 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type Dispatch struct {
3030
notifyConfigCache *memsto.NotifyConfigCacheType
3131
taskTplsCache *memsto.TaskTplCache
3232

33+
notifyRuleCache *memsto.NotifyRuleCacheType
34+
notifyChannelCache *memsto.NotifyChannelCacheType
35+
messageTemplateCache *memsto.MessageTemplateCacheType
36+
3337
alerting aconf.Alerting
3438

3539
Senders map[string]sender.Sender
@@ -47,15 +51,19 @@ type Dispatch struct {
4751
// 创建一个 Notify 实例
4852
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
4953
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
50-
taskTplsCache *memsto.TaskTplCache, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
54+
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
55+
messageTemplateCache *memsto.MessageTemplateCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
5156
notify := &Dispatch{
52-
alertRuleCache: alertRuleCache,
53-
userCache: userCache,
54-
userGroupCache: userGroupCache,
55-
alertSubscribeCache: alertSubscribeCache,
56-
targetCache: targetCache,
57-
notifyConfigCache: notifyConfigCache,
58-
taskTplsCache: taskTplsCache,
57+
alertRuleCache: alertRuleCache,
58+
userCache: userCache,
59+
userGroupCache: userGroupCache,
60+
alertSubscribeCache: alertSubscribeCache,
61+
targetCache: targetCache,
62+
notifyConfigCache: notifyConfigCache,
63+
taskTplsCache: taskTplsCache,
64+
notifyRuleCache: notifyRuleCache,
65+
notifyChannelCache: notifyChannelCache,
66+
messageTemplateCache: messageTemplateCache,
5967

6068
alerting: alerting,
6169

@@ -131,6 +139,233 @@ func (e *Dispatch) relaodTpls() error {
131139
return nil
132140
}
133141

142+
func (e *Dispatch) HandleEventNotifyV2(event *models.AlertCurEvent, isSubscribe bool) {
143+
144+
if len(event.NotifyRuleIDs) > 0 {
145+
for _, notifyRuleId := range event.NotifyRuleIDs {
146+
logger.Infof("notify rule ids: %v, event: %+v", notifyRuleId, event)
147+
notifyRule := e.notifyRuleCache.Get(notifyRuleId)
148+
if notifyRule == nil {
149+
continue
150+
}
151+
152+
for i := range notifyRule.NotifyConfigs {
153+
if !NotifyRuleApplicable(&notifyRule.NotifyConfigs[i], event) {
154+
continue
155+
}
156+
notifyChannel := e.notifyChannelCache.Get(notifyRule.NotifyConfigs[i].ChannelID)
157+
messageTemplate := e.messageTemplateCache.Get(notifyRule.NotifyConfigs[i].TemplateID)
158+
if notifyChannel == nil {
159+
logger.Warningf("notify_id: %d, event:%+v, channel_id:%d, template_id: %d, notify_channel not found", notifyRuleId, event, notifyRule.NotifyConfigs[i].ChannelID, notifyRule.NotifyConfigs[i].TemplateID)
160+
continue
161+
}
162+
163+
if notifyChannel.RequestType != "flashduty" && messageTemplate == nil {
164+
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, template_id: %d, message_template not found", notifyRuleId, notifyChannel.Ident, event, notifyRule.NotifyConfigs[i].TemplateID)
165+
continue
166+
}
167+
168+
// todo go send
169+
// todo 聚合 event
170+
go e.sendV2([]*models.AlertCurEvent{event}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
171+
}
172+
}
173+
}
174+
}
175+
176+
func NotifyRuleApplicable(notifyConfig *models.NotifyConfig, event *models.AlertCurEvent) bool {
177+
tm := time.Unix(event.TriggerTime, 0)
178+
triggerTime := tm.Format("15:04")
179+
triggerWeek := int(tm.Weekday())
180+
181+
timeMatch := false
182+
183+
if len(notifyConfig.TimeRanges) == 0 {
184+
timeMatch = true
185+
}
186+
for j := range notifyConfig.TimeRanges {
187+
if timeMatch {
188+
break
189+
}
190+
enableStime := notifyConfig.TimeRanges[j].Start
191+
enableEtime := notifyConfig.TimeRanges[j].End
192+
enableDaysOfWeek := notifyConfig.TimeRanges[j].Week
193+
length := len(enableDaysOfWeek)
194+
// enableStime,enableEtime,enableDaysOfWeek三者长度肯定相同,这里循环一个即可
195+
for i := 0; i < length; i++ {
196+
if enableDaysOfWeek[i] != triggerWeek {
197+
continue
198+
}
199+
200+
if enableStime < enableEtime {
201+
if enableEtime == "23:59" {
202+
// 02:00-23:59,这种情况做个特殊处理,相当于左闭右闭区间了
203+
if triggerTime < enableStime {
204+
// mute, 即没生效
205+
continue
206+
}
207+
} else {
208+
// 02:00-04:00 或者 02:00-24:00
209+
if triggerTime < enableStime || triggerTime >= enableEtime {
210+
// mute, 即没生效
211+
continue
212+
}
213+
}
214+
} else if enableStime > enableEtime {
215+
// 21:00-09:00
216+
if triggerTime < enableStime && triggerTime >= enableEtime {
217+
// mute, 即没生效
218+
continue
219+
}
220+
}
221+
222+
// 到这里说明当前时刻在告警规则的某组生效时间范围内,即没有 mute,直接返回 false
223+
timeMatch = true
224+
break
225+
}
226+
}
227+
228+
severityMatch := false
229+
for i := range notifyConfig.Severities {
230+
if notifyConfig.Severities[i] == event.Severity {
231+
severityMatch = true
232+
}
233+
}
234+
235+
tagMatch := true
236+
if len(notifyConfig.LabelKeys) > 0 {
237+
tagFilters, err := models.ParseTagFilter(notifyConfig.LabelKeys)
238+
if err != nil {
239+
logger.Errorf("failed to parse tag filter: %v", err)
240+
return false
241+
}
242+
tagMatch = common.MatchTags(event.TagsMap, tagFilters)
243+
}
244+
245+
attributesMatch := true
246+
if len(notifyConfig.Attributes) > 0 {
247+
tagFilters, err := models.ParseTagFilter(notifyConfig.Attributes)
248+
if err != nil {
249+
logger.Errorf("failed to parse tag filter: %v", err)
250+
return false
251+
}
252+
253+
attributesMatch = common.MatchTags(event.JsonTagsAndValue(), tagFilters)
254+
}
255+
256+
return timeMatch && severityMatch && tagMatch && attributesMatch
257+
}
258+
259+
func GetNotifyConfigParams(notifyConfig *models.NotifyConfig, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) ([]*models.User, []int64, map[string]string) {
260+
customParams := make(map[string]string)
261+
var userInfos []*models.User
262+
var flashDutyChannelIDs []int64
263+
var userInfoParams models.CustomParams
264+
265+
for key, value := range notifyConfig.Params {
266+
switch key {
267+
case "user_ids", "user_group_ids", "ids":
268+
if data, err := json.Marshal(value); err == nil {
269+
var ids []int64
270+
if json.Unmarshal(data, &ids) == nil {
271+
if key == "user_ids" {
272+
userInfoParams.UserIDs = ids
273+
} else if key == "user_group_ids" {
274+
userInfoParams.UserGroupIDs = ids
275+
} else if key == "ids" {
276+
flashDutyChannelIDs = ids
277+
}
278+
}
279+
}
280+
default:
281+
customParams[key] = value.(string)
282+
}
283+
}
284+
285+
users := userCache.GetByUserIds(userInfoParams.UserIDs)
286+
visited := make(map[int64]bool)
287+
for _, user := range users {
288+
if visited[user.Id] {
289+
continue
290+
}
291+
visited[user.Id] = true
292+
userInfos = append(userInfos, user)
293+
}
294+
userGroups := userGroupCache.GetByUserGroupIds(userInfoParams.UserGroupIDs)
295+
for _, userGroup := range userGroups {
296+
for _, user := range userGroup.Users {
297+
if visited[user.Id] {
298+
continue
299+
}
300+
visited[user.Id] = true
301+
userInfos = append(userInfos, &user)
302+
}
303+
}
304+
305+
return userInfos, flashDutyChannelIDs, customParams
306+
}
307+
308+
func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
309+
if len(events) == 0 {
310+
logger.Errorf("notify_id: %d events is empty", notifyRuleId)
311+
return
312+
}
313+
314+
tplContent := messageTemplate.RenderEvent(events)
315+
316+
userInfos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, e.userCache, e.userGroupCache)
317+
318+
e.Astats.GaugeNotifyRecordQueueSize.Inc()
319+
defer e.Astats.GaugeNotifyRecordQueueSize.Dec()
320+
321+
switch notifyChannel.RequestType {
322+
case "flashduty":
323+
for i := range flashDutyChannelIDs {
324+
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
325+
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, respBody, err)
326+
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, respBody, err)
327+
}
328+
return
329+
case "http":
330+
if e.notifyChannelCache.HttpConcurrencyAdd(notifyChannel.ID) {
331+
defer e.notifyChannelCache.HttpConcurrencyDone(notifyChannel.ID)
332+
}
333+
334+
if notifyChannel.ParamConfig.UserInfo != nil && len(userInfos) > 0 {
335+
for i := range userInfos {
336+
respBody, err := notifyChannel.SendHTTP(events, tplContent, customParams, userInfos[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
337+
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, userInfos[i], respBody, err)
338+
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, notifyChannel.RequestConfig.HTTPRequestConfig.URL, respBody, err)
339+
}
340+
} else {
341+
respBody, err := notifyChannel.SendHTTP(events, tplContent, customParams, nil, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
342+
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, respBody, err)
343+
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, notifyChannel.RequestConfig.HTTPRequestConfig.URL, respBody, err)
344+
}
345+
346+
case "email":
347+
err := notifyChannel.SendEmail(events, tplContent, userInfos, e.notifyChannelCache.GetSmtpClient(notifyChannel.ID))
348+
if err != nil {
349+
logger.Errorf("send email error: %v", err)
350+
}
351+
for i := range userInfos {
352+
msg := ""
353+
if err == nil {
354+
msg = "ok"
355+
}
356+
357+
// todo 这里的通知记录需要调整
358+
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, userInfos[i].Email, msg, err)
359+
}
360+
case "script":
361+
target, res, err := notifyChannel.SendScript(events, tplContent, customParams, userInfos)
362+
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
363+
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
364+
default:
365+
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v send type not found", notifyRuleId, notifyChannel.Name, events[0])
366+
}
367+
}
368+
134369
// HandleEventNotify 处理event事件的主逻辑
135370
// event: 告警/恢复事件
136371
// isSubscribe: 告警事件是否由subscribe的配置产生
@@ -173,6 +408,7 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
173408
}
174409

175410
// 处理事件发送,这里用一个goroutine处理一个event的所有发送事件
411+
go e.HandleEventNotifyV2(event, isSubscribe)
176412
go e.Send(rule, event, notifyTarget, isSubscribe)
177413

178414
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event

alert/process/process.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
154154
eventsMap := make(map[string][]*models.AlertCurEvent)
155155
for _, anomalyPoint := range anomalyPoints {
156156
event := p.BuildEvent(anomalyPoint, from, now, ruleHash)
157+
event.NotifyRuleIDs = cachedRule.NotifyRuleIds
157158
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
158159
hash := event.Hash
159160
alertingKeys[hash] = struct{}{}

alert/sender/callback.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,14 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
135135
func doSendAndRecord(ctx *ctx.Context, url, token string, body interface{}, channel string,
136136
stats *astats.Stats, events []*models.AlertCurEvent) {
137137
res, err := doSend(url, body, channel, stats)
138-
NotifyRecord(ctx, events, channel, token, res, err)
138+
NotifyRecord(ctx, events, 0, channel, token, res, err)
139139
}
140140

141-
func NotifyRecord(ctx *ctx.Context, evts []*models.AlertCurEvent, channel, target, res string, err error) {
141+
func NotifyRecord(ctx *ctx.Context, evts []*models.AlertCurEvent, notifyRuleID int64, channel, target, res string, err error) {
142142
// 一个通知可能对应多个 event,都需要记录
143143
notis := make([]*models.NotificaitonRecord, 0, len(evts))
144144
for _, evt := range evts {
145-
noti := models.NewNotificationRecord(evt, channel, target)
145+
noti := models.NewNotificationRecord(evt, notifyRuleID, channel, target)
146146
if err != nil {
147147
noti.SetStatus(models.NotiStatusFailure)
148148
noti.SetDetails(err.Error())

alert/sender/email.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
205205
if err == nil {
206206
msg = "ok"
207207
}
208-
NotifyRecord(ctx, m.events, models.Email, to, msg, err)
208+
NotifyRecord(ctx, m.events, 0, models.Email, to, msg, err)
209209
}
210210

211211
size++

alert/sender/mm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func SendMM(ctx *ctx.Context, message MatterMostMessage, events []*models.AlertC
7474
u, err := url.Parse(message.Tokens[i])
7575
if err != nil {
7676
logger.Errorf("mm_sender: failed to parse error=%v", err)
77-
NotifyRecord(ctx, events, channel, message.Tokens[i], "", err)
77+
NotifyRecord(ctx, events, 0, channel, message.Tokens[i], "", err)
7878
continue
7979
}
8080

alert/sender/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func alertingCallScript(ctx *ctx.Context, stdinBytes []byte, notifyScript models
104104
res = res[:validLen] + "..."
105105
}
106106

107-
NotifyRecord(ctx, []*models.AlertCurEvent{event}, channel, cmd.String(), res, buildErr(err, isTimeout))
107+
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, channel, cmd.String(), res, buildErr(err, isTimeout))
108108

109109
if isTimeout {
110110
if err == nil {

alert/sender/telegram.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func SendTelegram(ctx *ctx.Context, message TelegramMessage, events []*models.Al
7272
for i := 0; i < len(message.Tokens); i++ {
7373
if !strings.Contains(message.Tokens[i], "/") && !strings.HasPrefix(message.Tokens[i], "https://") {
7474
logger.Errorf("telegram_sender: result=fail invalid token=%s", message.Tokens[i])
75-
NotifyRecord(ctx, events, channel, message.Tokens[i], "", errors.New("invalid token"))
75+
NotifyRecord(ctx, events, 0, channel, message.Tokens[i], "", errors.New("invalid token"))
7676
continue
7777
}
7878
var url string

0 commit comments

Comments
 (0)