我接到一个任务,需要测量一些 REST API 的响应时间。要求是每隔 5 秒运行一次这些 API,在接下来的 30 分钟内重复执行,并最终计算平均响应时间。
这是一个相当简单的需求,使用一个负载测试工具可能过于复杂。
Linux crontab?不行,它没有秒级的精确度。最低要求是以 1 分钟为间隔。
让我们使用 Go 的简约但功能强大的调度库 go-quartz,以秒级精确度构建我们自己的 crontab。
自定义Job
Job的实际逻辑是通过go-quartz的Job接口实现的。该库提供了一些Job实现。然而,让我们实现这个接口,以拥有我们自己的Job。
type HttpRequestJob struct {
DB *sql.DB
Name string
HttpReq []HttpRequest
}
func NewHttpRequestJob(name string, db *sql.DB, req []HttpRequest) *HttpRequestJob {
return &HttpRequestJob{
DB: db,
Name: name,
HttpReq: req,
}
}
为了安全地保存数据并避免并发问题,让我们使用 SQLite 数据库。REST API的任务可以通过下面定义的 HttpRequest 列表来描述:
type HttpRequest struct {
Name string
Url string
Method string
Header map[string]string
Body map[string]any
Auth map[string]string
}
现在,让我们实现以下三个接口方法:
func (j *HttpRequestJob) Description() string {
return j.Name
}
func (j *HttpRequestJob) Key() int {
return quartz.HashCode(j.Name)
}
func (j *HttpRequestJob) Execute(ctx context.Context) {
for _, req := range j.HttpReq {
start := time.Now()
log.Printf("job %s, req %s started at %s", j.Description(), req.Name, start.Format("2006-01-02 15:04:05"))
client := resty.New()
// client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
if req.Auth != nil { // basic auth
client.SetBasicAuth(req.Auth["user"], req.Auth["password"])
}
var resp *resty.Response
var err error
switch req.Method {
case "GET":
resp, err = client.R().SetHeaders(req.Header).Get(req.Url)
case "POST":
resp, err = client.R().
SetHeaders(req.Header).
SetBody(req.Body).
Post(req.Url)
}
now := time.Now()
dur := time.Since(start)
// log.Printf("request: %v", resp.Request.Header)
reqBody, err := json.Marshal(req.Body)
if err != nil {
log.Printf("job %s, req %s failed to marshal request body. err=%v", j.Description(), req.Name, err)
reqBody = []byte("Failed to marshal request body")
}
_, err = j.DB.Exec(`
INSERT INTO httpTasks (name, startTime, endTime, request, response, statusCode, duration)
VALUES (?, ?, ?, ?, ?, ?,?);
`, req.Name, start, now,
reqBody, resp.Body(), resp.StatusCode(),
resp.Time().Milliseconds(),
)
if err != nil {
log.Printf("job %s, req %s failed to insert into db. err=%v", j.Description(), req.Name, err)
}
log.Printf("job %s, req %s finished. duration=%s", j.Description(), req.Name, dur)
}
}
Execute函数使用Go的resty库发送 REST 请求。记录开始时间、结束时间、请求体、响应体、状态码和响应时间,并保存到数据库中。
配置
让我们将配置以 YAML 格式存储,示例如下:
db: result.db
duration: "30m"
tasks:
- name: api1
cron: 0/5 * * * * *
type: http
requests:
- name: test case 1
url: https://reqres.in/api/users
method: POST
header:
Content-Type: application/json
Client-ID: 1234567
Client-Secret: abcdefghi
body:
name: morpheus
job: leader
- name: test case 2
url: https://abcdef.xyz/
method: GET
header:
Content-Type: application/json
Client-ID: 1234567
Client-Secret: abcdefghi
- name: api2
cron: 5/5 * * * * *
type: http
requests:
- name: test case 3
url: https://reqres.in/api/users
method: POST
header:
Content-Type: application/json
Client-ID: 1234567
Client-Secret: abcdefghi
body:
name: morpheus
job: leader
- name: test case 4
url: https://reqres.in/api/users?delay=3
method: GET
header:
Content-Type: application/json
Client-ID: 1234567
Client-Secret: abcdefghi
每个任务可能有多个需要按顺序执行的 REST 调用。任务使用 cron 格式进行调度,但以秒为单位开始。
初始化 SQLite 数据库
使用任务表模式初始化基于文件的 SQLite 数据库。
func init_db(filename string) (*sql.DB, error) {
db, err := sql.Open("sqlite", filename)
if err != nil {
return nil, err
}
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS httpTasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
startTime TEXT,
endTime TEXT,
request TEXT,
response TEXT,
statusCode INTEGER,
duration REAL
);
`)
return db, err
}
进行调度
将程序实现为 magefile 任务。
func T03_run() {
db := try.E1(init_db(config.String("db")))
defer db.Close()
var tasks []Task
config.BindStruct("tasks", &tasks)
now := time.Now()
d := time.Second * time.Duration(59-now.Second())
log.Printf("Wait for %s to start at the 59 seconds to catch the next minute", d)
time.Sleep(d)
log.Printf("Start the load test at %s", time.Now().Format("2006-01-02 15:04:05"))
var dur time.Duration
if config.String("duration") == "" {
dur = 30 * time.Minute
} else {
dur = try.E1(time.ParseDuration(config.String("duration")))
}
log.Printf("Stop the load test in %s as configured", dur)
ctx, cancel := context.WithTimeout(context.Background(), dur)
defer cancel()
go func() {
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
<-sigch
cancel()
}()
sched := quartz.NewStdScheduler()
sched.Start(ctx)
for _, tk := range tasks {
log.Printf("schedule job %s with %s", tk.Name, tk.Cron)
job := NewHttpRequestJob(tk.Name, db, tk.Requests)
trigger := try.E1(quartz.NewCronTrigger(tk.Cron))
try.E(sched.ScheduleJob(ctx, job, trigger))
}
sched.Wait(ctx)
}
等待到 59 秒时休眠,以便从下一分钟的 00 秒开始执行我们的任务。
为超时和信号设置上下文。
调度器部分很简单,只需初始化 Quartz 调度器。对于配置中定义的每个任务,创建 HTTP 请求作业,根据配置中的 cron 字符串创建触发器,并进行调度。
调用 sched.Wait(ctx) 来阻塞,直到所有作业完成或超时。简单明了。
处理结果并报告
要处理结果,我们只需将结果保存到 CSV 文件中,并计算每个 API 调用的平均时间。
func T04_report() {
db := try.E1(init_db(config.String("db")))
defer db.Close()
var tasks []Task
config.BindStruct("tasks", &tasks)
var reports []ReportData
for _, tk := range tasks {
for _, req := range tk.Requests {
csvFile := try.E1(os.Create(fmt.Sprintf("%s.csv", req.Name)))
defer csvFile.Close()
csvwriter := csv.NewWriter(csvFile)
csvwriter.Write([]string{
"id",
"name",
"startTime",
"endTime",
"request",
"response",
"statusCode",
"duration",
})
csvwriter.Flush()
rows := try.E1(db.Query("SELECT * FROM httpTasks WHERE name = ?", req.Name))
defer rows.Close()
count := 0
totalResp := 0.0
uniqStatus := []int{}
for rows.Next() {
var row DataRow
try.E(rows.Scan(&row.id, &row.name, &row.startTime, &row.endTime, &row.request, &row.response, &row.statusCode, &row.duration))
csvwriter.Write([]string{
fmt.Sprintf("%d", row.id),
row.name,
row.startTime,
row.endTime,
row.request,
row.response,
fmt.Sprintf("%d", row.statusCode),
fmt.Sprintf("%f", row.duration),
})
csvwriter.Flush()
count++
totalResp += row.duration
if !slices.Contains(uniqStatus, row.statusCode) {
uniqStatus = append(uniqStatus, row.statusCode)
}
}
reports = append(reports, ReportData{
Name: req.Name,
TotalRuns: count,
AverageResp: totalResp / float64(count),
UniqStatus: uniqStatus,
})
}
}
csvFile := try.E1(os.Create("report.csv"))
defer csvFile.Close()
csvwriter := csv.NewWriter(csvFile)
csvwriter.Write([]string{
"Name",
"TotalRuns",
"AverageResp",
"UniqStatusCodes",
})
csvwriter.Flush()
for _, r := range reports {
statusList := []string{}
for _, i := range r.UniqStatus {
statusList = append(statusList, fmt.Sprintf("%d", i))
}
csvwriter.Write([]string{
r.Name,
fmt.Sprintf("%d", r.TotalRuns),
fmt.Sprintf("%f", r.AverageResp),
strings.Join(statusList, ","),
})
csvwriter.Flush()
}
}
我们已经实现了具有秒级精确度的自定义 cron 任务。
本文暂时没有评论,来添加一个吧(●'◡'●)