编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

使用go-quartz创建我们自己的Crontab

wxchong 2024-07-19 06:08:07 开源技术 13 ℃ 0 评论

我接到一个任务,需要测量一些 REST API 的响应时间。要求是每隔 5 秒运行一次这些 API,在接下来的 30 分钟内重复执行,并最终计算平均响应时间。

这是一个相当简单的需求,使用一个负载测试工具可能过于复杂。

Linux crontab?不行,它没有秒级的精确度。最低要求是以 1 分钟为间隔。

让我们使用 Go 的简约但功能强大的调度库 go-quartz,以秒级精确度构建我们自己的 crontab。

自定义Job

Job的实际逻辑是通过go-quartz的Job接口实现的。该库提供了一些Job实现。然而,让我们实现这个接口,以拥有我们自己的Job。

type HttpRequestJob struct {
 DB      *sql.DB
 Name    string
 HttpReq []HttpRequest
}

func NewHttpRequestJob(name string, db *sql.DB, req []HttpRequest) *HttpRequestJob {
 return &HttpRequestJob{
  DB:      db,
  Name:    name,
  HttpReq: req,
 }
}

为了安全地保存数据并避免并发问题,让我们使用 SQLite 数据库。REST API的任务可以通过下面定义的 HttpRequest 列表来描述:

type HttpRequest struct {
 Name   string
 Url    string
 Method string
 Header map[string]string
 Body   map[string]any
 Auth   map[string]string
}

现在,让我们实现以下三个接口方法:

func (j *HttpRequestJob) Description() string {
 return j.Name
}

func (j *HttpRequestJob) Key() int {
 return quartz.HashCode(j.Name)
}

func (j *HttpRequestJob) Execute(ctx context.Context) {
 for _, req := range j.HttpReq {
  start := time.Now()
  log.Printf("job %s, req %s started at %s", j.Description(), req.Name, start.Format("2006-01-02 15:04:05"))

  client := resty.New()
  // client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
  if req.Auth != nil { // basic auth
   client.SetBasicAuth(req.Auth["user"], req.Auth["password"])
  }
  var resp *resty.Response
  var err error
  switch req.Method {
  case "GET":
   resp, err = client.R().SetHeaders(req.Header).Get(req.Url)
  case "POST":
   resp, err = client.R().
    SetHeaders(req.Header).
    SetBody(req.Body).
    Post(req.Url)
  }
  now := time.Now()
  dur := time.Since(start)
  // log.Printf("request: %v", resp.Request.Header)
  reqBody, err := json.Marshal(req.Body)
  if err != nil {
   log.Printf("job %s, req %s failed to marshal request body. err=%v", j.Description(), req.Name, err)
   reqBody = []byte("Failed to marshal request body")
  }
  _, err = j.DB.Exec(`
   INSERT INTO httpTasks (name, startTime, endTime, request, response, statusCode, duration)
   VALUES (?, ?, ?, ?, ?, ?,?);
  `, req.Name, start, now,
   reqBody, resp.Body(), resp.StatusCode(),
   resp.Time().Milliseconds(),
  )

  if err != nil {
   log.Printf("job %s, req %s failed to insert into db. err=%v", j.Description(), req.Name, err)
  }

  log.Printf("job %s, req %s finished. duration=%s", j.Description(), req.Name, dur)
 }
}

Execute函数使用Go的resty库发送 REST 请求。记录开始时间、结束时间、请求体、响应体、状态码和响应时间,并保存到数据库中。

配置

让我们将配置以 YAML 格式存储,示例如下:

db: result.db
duration: "30m"

tasks:
  - name: api1
    cron: 0/5 * * * * *
    type: http
    requests:
      - name: test case 1
        url: https://reqres.in/api/users
        method: POST
        header:
          Content-Type: application/json
          Client-ID: 1234567
          Client-Secret: abcdefghi
        body:
           name: morpheus
           job: leader
      - name: test case 2
        url: https://abcdef.xyz/
        method: GET
        header:
          Content-Type: application/json
          Client-ID: 1234567
          Client-Secret: abcdefghi
  - name: api2
    cron: 5/5 * * * * *
    type: http
    requests:
      - name: test case 3
        url: https://reqres.in/api/users
        method: POST
        header:
          Content-Type: application/json
          Client-ID: 1234567
          Client-Secret: abcdefghi
        body:
           name: morpheus
           job: leader
      - name: test case 4
        url: https://reqres.in/api/users?delay=3
        method: GET
        header:
          Content-Type: application/json
          Client-ID: 1234567
          Client-Secret: abcdefghi

每个任务可能有多个需要按顺序执行的 REST 调用。任务使用 cron 格式进行调度,但以秒为单位开始。

初始化 SQLite 数据库

使用任务表模式初始化基于文件的 SQLite 数据库。

func init_db(filename string) (*sql.DB, error) {
 db, err := sql.Open("sqlite", filename)
 if err != nil {
  return nil, err
 }

 _, err = db.Exec(`
  CREATE TABLE IF NOT EXISTS httpTasks (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   name TEXT,
   startTime TEXT,
   endTime TEXT,
   request TEXT,
   response TEXT,
   statusCode INTEGER,
   duration REAL
  );
 `)
 return db, err
}

进行调度

将程序实现为 magefile 任务。

func T03_run() {
 db := try.E1(init_db(config.String("db")))
 defer db.Close()

 var tasks []Task
 config.BindStruct("tasks", &tasks)

 now := time.Now()
 d := time.Second * time.Duration(59-now.Second())
 log.Printf("Wait for %s to start at the 59 seconds to catch the next minute", d)
 time.Sleep(d)
 log.Printf("Start the load test at %s", time.Now().Format("2006-01-02 15:04:05"))

 var dur time.Duration
 if config.String("duration") == "" {
  dur = 30 * time.Minute
 } else {
  dur = try.E1(time.ParseDuration(config.String("duration")))
 }
 log.Printf("Stop the load test in %s as configured", dur)

 ctx, cancel := context.WithTimeout(context.Background(), dur)
 defer cancel()

 go func() {
  sigch := make(chan os.Signal, 1)
  signal.Notify(sigch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
  <-sigch
  cancel()
 }()

 sched := quartz.NewStdScheduler()
 sched.Start(ctx)

 for _, tk := range tasks {
  log.Printf("schedule job %s with %s", tk.Name, tk.Cron)
  job := NewHttpRequestJob(tk.Name, db, tk.Requests)
  trigger := try.E1(quartz.NewCronTrigger(tk.Cron))
  try.E(sched.ScheduleJob(ctx, job, trigger))
 }
 sched.Wait(ctx)
}

等待到 59 秒时休眠,以便从下一分钟的 00 秒开始执行我们的任务。

为超时和信号设置上下文。

调度器部分很简单,只需初始化 Quartz 调度器。对于配置中定义的每个任务,创建 HTTP 请求作业,根据配置中的 cron 字符串创建触发器,并进行调度。

调用 sched.Wait(ctx) 来阻塞,直到所有作业完成或超时。简单明了。

处理结果并报告

要处理结果,我们只需将结果保存到 CSV 文件中,并计算每个 API 调用的平均时间。

func T04_report() {
 db := try.E1(init_db(config.String("db")))
 defer db.Close()

 var tasks []Task
 config.BindStruct("tasks", &tasks)

 var reports []ReportData
 for _, tk := range tasks {
  for _, req := range tk.Requests {
   csvFile := try.E1(os.Create(fmt.Sprintf("%s.csv", req.Name)))
   defer csvFile.Close()
   csvwriter := csv.NewWriter(csvFile)
   csvwriter.Write([]string{
    "id",
    "name",
    "startTime",
    "endTime",
    "request",
    "response",
    "statusCode",
    "duration",
   })
   csvwriter.Flush()

   rows := try.E1(db.Query("SELECT * FROM httpTasks WHERE name = ?", req.Name))
   defer rows.Close()

   count := 0
   totalResp := 0.0
   uniqStatus := []int{}
   for rows.Next() {
    var row DataRow
    try.E(rows.Scan(&row.id, &row.name, &row.startTime, &row.endTime, &row.request, &row.response, &row.statusCode, &row.duration))
    csvwriter.Write([]string{
     fmt.Sprintf("%d", row.id),
     row.name,
     row.startTime,
     row.endTime,
     row.request,
     row.response,
     fmt.Sprintf("%d", row.statusCode),
     fmt.Sprintf("%f", row.duration),
    })
    csvwriter.Flush()
    count++
    totalResp += row.duration
    if !slices.Contains(uniqStatus, row.statusCode) {
     uniqStatus = append(uniqStatus, row.statusCode)
    }
   }
   reports = append(reports, ReportData{
    Name:        req.Name,
    TotalRuns:   count,
    AverageResp: totalResp / float64(count),
    UniqStatus:  uniqStatus,
   })
  }
 }

 csvFile := try.E1(os.Create("report.csv"))
 defer csvFile.Close()
 csvwriter := csv.NewWriter(csvFile)
 csvwriter.Write([]string{
  "Name",
  "TotalRuns",
  "AverageResp",
  "UniqStatusCodes",
 })
 csvwriter.Flush()
 for _, r := range reports {
  statusList := []string{}
  for _, i := range r.UniqStatus {
   statusList = append(statusList, fmt.Sprintf("%d", i))
  }
  csvwriter.Write([]string{
   r.Name,
   fmt.Sprintf("%d", r.TotalRuns),
   fmt.Sprintf("%f", r.AverageResp),
   strings.Join(statusList, ","),
  })
  csvwriter.Flush()
 }
}

我们已经实现了具有秒级精确度的自定义 cron 任务。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表