编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

open-falcon 报警alarm 代码分析(arm over time 报警)

wxchong 2024-07-19 06:07:49 开源技术 15 ℃ 0 评论

总结:alarm消费由judge产生的redis报警事件,根据优先级高低是否做合并,发往不同的报警通道


高优先级报警比如p0: judge产生报警事件-->写入redis event:p0队列 -->alarm消费-->获取发送对象并处理调用回调函数(如果有)-->根据策略不同生成不同通道的报警(im,sms,mail,phone)等-->写入redis各个通道的发送队列 /im /sms /mail /phone -->发送报警的worker取出报警发送


低优先级报警比如p4: judge产生报警事件-->写入redis event:p4队列 -->alarm消费-->获取发送对象并处理调用回调函数(如果有)-->根据策略不同生成不同通道的合并(im,sms,mail,phone)事件写入合并队列(来自于配置文件中的 /queue/user/im)等-->由不通通道的合并函数处理,合并报警生成dashboard链接调用dashboard的api写入falcon_portal.alert_link表中供用户日后查看原始信息-->写入redis各个通道的发送队列 /im /sms /mail /phone -->发送报警的worker取出报警发送

下面具体看下代码


1.main函数中的核心就是这几个goroutine了

        //消费报警事件
 	go cron.ReadHighEvent()
	go cron.ReadLowEvent()
	//合并低优先报警
	go cron.CombineSms()
	go cron.CombineMail()
	go cron.CombineIM()
	//发送真实报警
	go cron.ConsumeIM()
	go cron.ConsumeSms()
	go cron.ConsumeMail()
	go cron.ConsumePhone()
	go cron.CleanExpiredEvent()

2.ReadHighEvent 和 ReadLowEvent的区别就是consume时分优先级

func ReadHighEvent() {
	queues := g.Config().Redis.HighQueues
	if len(queues) == 0 {
		return
	}

	for {
		/*brpop 多个队列的1条返回event
		1.传入的是包含多个高优先级的队列的列表比如[p0,p1,p2]
		那么总是先pop完event:p0的队列,然后才是p1 ,p2(这里我进行过实测)
		2.单纯的popevent速度是很快的,但是每次循环里都有下面的consume,如果
		consume速度慢的话会直接影响整体的pop速度,我观察过再没加goroutine之前
		pop速度大概5条/s ,如果报警过多会有堆积现象,之前看到会有4个小时左右的延迟
		*/
		event, err := popEvent(queues)
		if err != nil {
			time.Sleep(time.Second)
			continue
		}
		//这里的consume其实和popevent已经没关系了,所以异步执行,但是可能会产生过多的goroutine
		go consume(event, true)
	}
}


3.消费报警事件函数 consume

func consume(event *cmodel.Event, isHigh bool) {
	actionId := event.ActionId()
	if actionId <= 0 {
		return
	}
        /*这里通过 event中的actionid 拿到 action
        就是拿到这个 报警组的名字 是否有回调等信息
        */
	action := api.GetAction(actionId)
	if action == nil {
		return
	}
        //有回调的话处理下http get调用相应的回调函数,会把报警的信息作为参数带上
	if action.Callback == 1 {
		HandleCallback(event, action)
	}

	if isHigh {
		consumeHighEvents(event, action)
	} else {
		consumeLowEvents(event, action)
	}
}

4.下面分别看下高低优先级的consume函数


// 高优先级的不做报警合并
func consumeHighEvents(event *cmodel.Event, action *api.Action) {
	//如果报警没有接收组,那么直接返回了
	if action.Uic == "" {
		return
	}

	phones, mails, ims := api.ParseTeams(action.Uic)
        log.Infof("api.ParseTeams--phones, mails, ims,action.uic",phones, mails, ims,action.Uic)
  	//生成报警内容,这里可以为不同通道的报警做定制
	smsContent := GenerateSmsContent(event)
	mailContent := GenerateMailContent(event)
	//imContent := GenerateIMContent(event)
        phoneContent := GeneratePhoneContent(event)


	/* 这里根据报警的级别可以做通道的定制
	如<=P2 才发送短信 =p9 电话报警等等
	下面的redi.wirtesms等方法就是将报警内容lpush到不通通道的发送队列中
	*/
	if event.Priority() < 3 {
		redi.WriteSms(phones, smsContent)
	}
        //p9 电话报警
        if event.Priority() ==9 {
		redi.WriteSms(phones, smsContent)
    	redi.WritePhone(phones, phoneContent)
	}
	redi.WriteIM(mails, mailContent)
	redi.WriteMail(mails, smsContent, mailContent)

}


// 低优先级的做报警合并
func consumeLowEvents(event *cmodel.Event, action *api.Action) {
	if action.Uic == "" {
		return
	}

	// <=P2 才发送短信
        //parseuser函数将event转换为合并消息 写入中间队列
	if event.Priority() < 3 {
		ParseUserSms(event, action)
	}
        
	ParseUserIm(event, action)
	ParseUserMail(event, action)
}

下面以ParseUserMail为例

func ParseUserMail(event *cmodel.Event, action *api.Action) {
	//api根据报警组获取组里人
	userMap := api.GetUsers(action.Uic)

	metric := event.Metric()
	subject := GenerateSmsContent(event)
	content := GenerateMailContent(event)
	status := event.Status
	priority := event.Priority()

	queue := g.Config().Redis.UserMailQueue

	rc := g.RedisConnPool.Get()
	defer rc.Close()
       //遍历usermap 生成报警中间态消息并LPUSH写入中间队列
	for _, user := range userMap {
		dto := MailDto{
			Priority: priority,
			Metric:   metric,
			Subject:  subject,
			Content:  content,
			Email:    user.Email,
			Status:   status,
		}
		bs, err := json.Marshal(dto)
		if err != nil {
			log.Error("json marshal MailDto fail:", err)
			continue
		}

		_, err = rc.Do("LPUSH", queue, string(bs))
		if err != nil {
			log.Error("LPUSH redis", queue, "fail:", err, "dto:", string(bs))
		}
	}
}

此时低优先级的报警存在于配置文件中的中间队列名称的redis队列中 /queue/user/mail

5.报警合并函数

func CombineSms() {
	for {
		// 每分钟读取处理一次
		time.Sleep(time.Minute)
		combineSms()
	}
}


func combineIM() {
	//从中间队列中pop出要合并的报警
	dtos := popAllImDto()
	count := len(dtos)
	if count == 0 {
		return
	}

	dtoMap := make(map[string][]*ImDto)
	for i := 0; i < count; i++ {
		//根据报警的metirc priority status 和接收人作为key合并报警为列表
		key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
		if _, ok := dtoMap[key]; ok {
			dtoMap[key] = append(dtoMap[key], dtos[i])
		} else {
			dtoMap[key] = []*ImDto{dtos[i]}
		}
	}

	for _, arr := range dtoMap {
		size := len(arr)
		//如果合并后的报警只有一条直接写入redis发送队列
		if size == 1 {
			redi.WriteIM([]string{arr[0].IM}, arr[0].Content)
			continue
		}

		// 把多个im内容写入数据库,只给用户提供一个链接
		contentArr := make([]string, size)
		for i := 0; i < size; i++ {
			contentArr[i] = arr[i].Content
		}
		content := strings.Join(contentArr, ",,")

		first := arr[0].Content
		t := strings.Split(first, "][")
		eg := ""
		if len(t) >= 3 {
			eg = t[2]
		}
                //调用dashboard的api将合并后的信息写入falcon_portal.alert_link表
		path, err := api.LinkToSMS(content)
		chat := ""
		if err != nil || path == "" {
			chat = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
			log.Error("create short link fail", err)
		} else {
			//生成一个汇总信息 展示:metric status link的url
			chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
				arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
			log.Debugf("combined im is:%s", chat)
		}
        if  arr[0].IM==""{
        	email:= fmt.Sprintf("%s@bytedance.com",arr[0].Name)
			redi.WriteIM([]string{email}, chat)
		}else{
			redi.WriteIM([]string{arr[0].IM}, chat)
		}

	}
}

6.最后看下报警发送函数

func ConsumeIM() {
	for {
		//rpop出所有的报警信息到一个slice中
		L := redi.PopAllIM()
		if len(L) == 0 {
			time.Sleep(time.Millisecond * 200)
			continue
		}
		SendIMList(L)
	}
}

func SendIMList(L []*model.IM) {
	for _, im := range L {
		/*
		1.IMWorkerChan是带缓冲的chan,chan的长度意思就是同时可以多少个send作业
		2.向im发送workerchan中写入1说明可以发送一条
		3.如果队列没满,是不会阻塞在这里的,否则会阻塞
		*/
		IMWorkerChan <- 1
		go SendIM(im)
	}
}

func SendIM(im *model.IM) {
	/*
	1.这里使用defer的逻辑是先发送后读取chan
	2.因为如果先读取意味着又有一个work可以开始和逻辑相反
	3.下面就是自己定制的发送方式了
	*/
	defer func() {
		<-IMWorkerChan
	}()
    if im.Tos==""{
    	log.Errorf("content_tos_empty_error %s ",im.Content)
    	return
	}
}


Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表