网站首页 > 开源技术 正文
- 为什么写这个库
- 应用场景有哪些
- 如何使用
- 总结
为什么要写这个库?
在开始自研 go-queue
之前,针对以下我们调研目前的开源队列方案:
beanstalkd
beanstalkd
有一些特殊好用功能:支持任务priority、延时(delay)、超时重发(time-to-run)和预留(buried),能够很好的支持分布式的后台任务和定时任务处理。如下是 beanstalkd
基本部分:
job
:任务单元;tube
:任务队列,存储统一类型job
。producer 和 consumer 操作对象;producer
:job
生产者,通过 put 将 job 加入一个 tube;consumer
:job
消费者,通过 reserve/release/bury/delete 来获取job或改变job的状态;
很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。
但是这对不熟悉 beanstalkd
操作的 go 开发者而言,需要学习成本。
kafka
类似基于 kafka
消息队列作为存储的方案,存储单元是消息,如果要实现延时执行,可以想到的方案是以延时执行的时间作为 topic
,这样在大型的消息系统中,充斥大量一次性的 topic
(dq_1616324404788, dq_1616324417622
),当时间分散,会容易造成磁盘随机写的情况。
而且在 go 生态中,
同时考虑以下因素:
- 支持延时任务
- 高可用,保证数据不丢失
- 可扩展资源和性能
所以我们自己基于以上两个基础组件开发了 go-queue
:
- 基于
beanstalkd
开发了dq
,支持定时和延时操作。同时加入redis
保证消费唯一性。 - 基于
kafka
开发了kq
,简化生产者和消费者的开发API,同时在写入kafka使用批量写,节省IO。
整体设计如下:
应用场景
首先在消费场景来说,一个是针对任务队列,一个是消息队列。而两者最大的区别:
- 任务是没有顺序约束;消息需要;
- 任务在加入中,或者是等待中,可能存在状态更新(或是取消);消息则是单一的存储即可;
所以在背后的基础设施选型上,也是基于这种消费场景。
dq
:依赖于beanstalkd
,适合延时、定时任务执行;kq
:依赖于kafka
,适用于异步、批量任务执行;
而从其中 dq
的 API 中也可以看出:
// 延迟任务执行
- dq.Delay(msg, delayTime);
// 定时任务执行
- dq.At(msg, atTime);
而在我们内部:
- 如果是 异步消息消费/推送 ,则会选择使用
kq
:kq.Push(msg)
; - 如果是 15分钟提醒/ 明天中午发送短信 等,则使用
dq
;
如何使用
分别介绍 dq
和 kq
的使用方式:
dq
// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
}
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
// your consume logic
fmt.Println(string(body))
})
和普通的 生产者-消费者 模型类似,开发者也只需要关注以下:
- 开发者只需要关注自己的任务类型「延时/定时」
- 消费端的消费逻辑
kq
producer.go
:
// message structure
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19093",
"127.0.0.1:19094",
}, "kq")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
select {
case <-ticker.C:
count := rand.Intn(100)
// 准备消息
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
// push to kafka broker
if err := pusher.Push(string(body)); err != nil {
log.Fatal(err)
}
}
}
config.yaml
:
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
consumer.go
:
var c kq.KqConf
conf.MustLoad("config.yaml", &c)
// WithHandle: 具体的处理msg的logic
// 这也是开发者需要根据自己的业务定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
}))
defer q.Stop()
q.Start()
和 dq
不同的是:开发者不需要关注任务类型(在这里也没有任务的概念,传递的都是 message data
)。
其他操作和 dq
类似,只是将 业务处理函数 当成配置直接传入消费者中。
总结
在我们目前的场景中,kq
大量使用在我们的异步消息服务;而延时任务,我们除了 dq
,还可以使用内存版的 TimingWheel「go-zero
生态组件」。
关于 go-queue
更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。
https://github.com/tal-tech/go-queue
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持我们!
go-zero 系列文章见『微服务实践』公众号
猜你喜欢
- 2024-12-19 Java开发人员必知的常用类库,这些你都知道吗?
- 2024-12-19 Sips and scripts: Starbucks China embraces the micro drama trend
- 2024-12-19 任性AWS 大范围升级进行时 aws d 1.6预热要求
- 2024-12-19 Mac平台好用的php集成开发环境 mac php开发环境搭建
- 2024-12-19 写PHP框架需要具备那些知识? 写php框架需要具备那些知识和技能
- 2024-12-19 现代软件开发流程(12-factors) 现代软件工程模式的基本要点
- 2024-12-19 黑客的天堂?Tornado Cash混币后的资产如何快速追踪?
- 2024-12-19 移动端建站平台 PredictSpring:帮助品牌拥抱移动电商
- 2024-12-19 Amazon Elastic Beanstalk 是什么(1)
- 2024-12-19 比呀比: BeanStalk 雪印 孕妇 雪印牛奶中毒事件
你 发表评论:
欢迎- 05-16东契奇:DFS训练时喷了我很多垃圾话 我不懂他为什么比赛不这么干
- 05-16这两球很伤!詹姆斯空篮拉杆不中 DFS接里夫斯传球空接也没放进
- 05-16湖人自媒体调查:89%球迷希望DFS回归79%希望詹姆斯回归
- 05-16Shams:湖人得到全能球员DFS 节省了1500万奢侈税&薪金空间更灵活
- 05-16G5湖人胜率更高!詹姆斯不满判罚,DFS谈5人打满下半场:这很艰难
- 05-16DFS:当东契奇进入状态 所有防守者在他面前都像个圆锥桶
- 05-16上一场9中6!DFS:不能让纳兹-里德这样的球员那么轻松地投三分
- 05-16WIDER FACE评测结果出炉:滴滴人脸检测DFS算法获世界第一
- 最近发表
-
- 东契奇:DFS训练时喷了我很多垃圾话 我不懂他为什么比赛不这么干
- 这两球很伤!詹姆斯空篮拉杆不中 DFS接里夫斯传球空接也没放进
- 湖人自媒体调查:89%球迷希望DFS回归79%希望詹姆斯回归
- Shams:湖人得到全能球员DFS 节省了1500万奢侈税&薪金空间更灵活
- G5湖人胜率更高!詹姆斯不满判罚,DFS谈5人打满下半场:这很艰难
- DFS:当东契奇进入状态 所有防守者在他面前都像个圆锥桶
- 上一场9中6!DFS:不能让纳兹-里德这样的球员那么轻松地投三分
- WIDER FACE评测结果出炉:滴滴人脸检测DFS算法获世界第一
- 湖人自媒体调查:89%球迷希望DFS回归 79%希望詹姆斯回归
- 一觉醒来湖人苦盼的纯3D终于到位 DFS能带给紫金军多少帮助
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)