Help us learn about your current experience with the documentation. Take the survey.

---
stage: none
group: unassigned
info: 任何拥有 Maintainer 角色的用户均可合并此内容的更新。详情请见 https://docs.gitlab.com/development/development_processes/#development-guidelines-review.
title: Sidekiq 开发指南
---

我们使用 [Sidekiq](https://github.com/mperham/sidekiq) 作为后台任务处理器。这些指南旨在编写在 GitLab.com 上运行良好且与我们现有 Worker 类保持一致的任务。关于 GitLab 管理的信息,请参见 [配置 Sidekiq](../../administration/sidekiq/_index.md)。

以下主题有更详细的页面说明:

1. [更新兼容性](compatibility_across_updates.md)
1. [任务幂等性与去重](idempotent_jobs.md)
1. [有限容量工作器:使用指定并发量持续执行任务](limited_capacity_worker.md)
1. [日志记录](logging.md)
1. [Worker 属性](worker_attributes.md)
   1. **任务紧急程度** 指定队列和执行 SLO
   1. **资源边界****外部依赖** 用于描述工作负载
   1. **功能分类**
   1. **数据库负载均衡**

## ApplicationWorker

所有 Worker 都应包含 `ApplicationWorker` 而非 `Sidekiq::Worker`,后者添加了便捷方法并基于 [路由规则](../../administration/sidekiq/processing_specific_job_classes.md#routing-rules) 自动设置队列。

## 分片(Sharding)

所有 Sidekiq API 调用必须考虑分片。为此,请在 `Sidekiq::Client.via` 块内使用 Sidekiq API,以确保使用正确的 `Sidekiq.redis` 连接池。通过调用 `Gitlab::SidekiqSharding::Router.get_shard_instance` 方法获取合适的 Redis 连接池。

```ruby
pool_name, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(worker_class.sidekiq_options['store'])
Sidekiq::Client.via(pool) do
  ...
end

未路由的 Sidekiq 调用会在所有 API 请求、服务端任务和测试中被验证器捕获。我们建议使用 Gitlab::SidekiqSharding::Router 编写应用逻辑。但由于分片尚未发布,如果组件不影响 GitLab.com,可在 .allow_unrouted_sidekiq_calls 作用域内运行:

# 添加注释说明为何此处允许未路由的 Sidekiq 调用是安全的
Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
  # 未路由逻辑
end

Geo Rake 任务 曾使用 allow_unrouted_sidekiq_calls 的示例,因其不影响 GitLab.com。但开发者应尽可能编写分片感知的代码,因为这是分片作为功能向 GitLab 自托管用户发布 的前提条件。

重试机制

Sidekiq 默认使用 25 次重试,每次重试之间有退避间隔。25 次重试意味着最后一次重试将在首次尝试约三周后发生(假设前 24 次重试均失败)。

这意味着任务从调度到执行期间可能发生很多变化。因此必须保护 Worker,使其在调度后状态变化时不会失败 25 次。例如,当任务调度的项目被删除时,任务不应失败。

错误做法:

def perform(project_id)
  project = Project.find(project_id)
  # ...
end

正确做法:

def perform(project_id)
  project = Project.find_by_id(project_id)
  return unless project
  # ...
end

对于大多数 Worker(尤其是 幂等 Worker),25 次重试的默认值已足够。许多旧 Worker 声明 3 次重试,这是 GitLab 应用之前的默认值。3 次重试在几分钟内完成,因此任务极易完全失败。

若符合以下任一情况,可考虑降低重试次数:

  1. Worker 调用外部服务且不保证投递(例如 Webhooks)
  2. Worker 非幂等,多次运行可能导致系统状态不一致(例如发布系统备注后执行操作:若第二步失败且 Worker 重试,系统备注会再次发布)
  3. Worker 是频繁运行的定时任务(例如每小时运行的任务,无需重试超过一小时,避免同时运行相同任务)

每次 Worker 重试在我们的指标中计为一次失败。一个 Worker 总是失败 9 次在第 10 次成功,其错误率为 90%。

若要在不跟踪 Sentry 异常的情况下手动重试 Worker,使用继承自 Gitlab::SidekiqMiddleware::RetryError 的异常类:

ServiceUnavailable = Class.new(::Gitlab::SidekiqMiddleware::RetryError)

def perform
  ...

  raise ServiceUnavailable if external_service_unavailable?
end

失败处理

失败通常由 Sidekiq 自身处理,利用上述内置重试机制。应允许异常抛出以便 Sidekiq 重新调度任务。

若任务在所有重试尝试后仍需执行操作,将其添加到 sidekiq_retries_exhausted 方法中:

sidekiq_retries_exhausted do |msg, ex|
  project = Project.find_by_id(msg['args'].first)
  return unless project

  project.perform_a_rollback # 处理永久失败
end

def perform(project_id)
  project = Project.find_by_id(project_id)
  return unless project

  project.some_action # 抛出异常
end

并发限制

为防止系统过载并确保可靠运行,我们强烈建议为所有 Worker 设置 并发限制。限制每个 Worker 可调度的任务数量有助于降低系统过载风险,避免严重事故。

此指南适用于 .com 和自托管客户。单个 Worker 调度数千任务极易破坏 SM 实例的正常运行。

若 Sidekiq 只有 20 个线程,而特定任务的限制为 200,则其并发量永远无法达到 200,因此不会受到限制。

静态并发限制

静态限制示例:

class LimitedWorker
  include ApplicationWorker

  concurrency_limit -> { 100 if Feature.enabled?(:concurrency_limit_some_worker, Feature.current_request) }

  # ...
end

部署并发限制时仅使用布尔特性标志(完全开启/关闭)。使用 Feature.current_request 的百分比部署可能导致不一致行为。

或直接设置固定限制:

concurrency_limit -> { 250 }

注意:使用静态限制意味着任何更新或更改都需要合并 MR 并等待下次部署生效。

实例可配置并发限制

若允许实例管理员控制并发限制:

concurrency_limit -> { ApplicationSetting.current.some_feature_concurrent_sidekiq_jobs }

此方法允许为 .com 和 GitLab 自托管实例设置不同限制。实现方式:

  1. 创建迁移添加配置选项,默认值设为自托管限制
  2. 在同一 MR 中,部署仅更新 .com 限制的迁移

如何选择限制

确定合适限制时,可使用 Grafana 中的 sidekiq: Worker Concurrency Detail 仪表盘作为参考。

并发限制可能暂时超出,不应依赖为严格限制。

延迟 Sidekiq Worker

Sidekiq Worker 通过两种方式延迟:

  1. 手动:使用特性标志显式延迟特定 Worker,详情见 此处

  2. 自动:类似批量迁移的 限流机制,使用数据库健康指标延迟 Sidekiq Worker

    要使用自动延迟机制,Worker 必须通过 defer_on_database_health_signal 方法选择加入,参数包括 gitlab_schemadelay_by(延迟时间)和表名(用于自动清理数据库指标)。

    示例

     module Chaos
       class SleepWorker # rubocop:disable Scalability/IdempotentWorker
         include ApplicationWorker
    
         data_consistency :always
    
         sidekiq_options retry: 3
         include ChaosQueue
    
         defer_on_database_health_signal :gitlab_main, [:users], 1.minute
    
         def perform(duration_s)
           Gitlab::Chaos.sleep(duration_s)
         end
       end
     end

延迟任务的日志包含以下信息指示来源:

  • job_status: deferred
  • job_deferred_by: feature_flagdatabase_health_check

Sidekiq 队列

过去每个 Worker 有独立队列,基于 Worker 类名自动设置。名为 ProcessSomethingWorker 的 Worker 队列名为 process_something。现在可通过 队列路由规则 将 Worker 路由到特定队列。在 GDK 中,新 Worker 路由到名为 default 的队列。

若不确定 Worker 使用哪个队列,可通过 SomeWorker.queue 查找。几乎无需手动使用 sidekiq_options queue: :some_queue 覆盖队列名。

添加新 Worker 后,运行 bin/rake gitlab:sidekiq:all_queues_yml:generate 重新生成 app/workers/all_queues.ymlee/app/workers/all_queues.yml,以便不使用路由规则的安装能被 sidekiq-cluster 捕获。潜在变更详情见 epic 596

此外,运行 bin/rake gitlab:sidekiq:sidekiq_queues_yml:generate 重新生成 config/sidekiq_queues.yml

队列命名空间

不同 Worker 不能共享队列,但可共享队列命名空间。

为 Worker 定义队列命名空间后,可启动 Sidekiq 进程自动处理该命名空间中所有 Worker 的任务,无需显式列出所有队列名。例如,若所有由 sidekiq-cron 管理的 Worker 使用 cronjob 命名空间,我们可以专门为这类定时任务启动 Sidekiq 进程。若后续添加使用 cronjob 命名空间的新 Worker,Sidekiq 进程(重启后)也会处理该 Worker 的任务,无需修改配置。

队列命名空间可通过 queue_namespace DSL 类方法设置:

class SomeScheduledTaskWorker
  include ApplicationWorker

  queue_namespace :cronjob

  # ...
end

底层实现会将 SomeScheduledTaskWorker.queue 设置为 cronjob:some_scheduled_task。常用命名空间有自己的关注模块,可轻松包含到 Worker 类中,并可能设置队列命名空间外的其他 Sidekiq 选项。例如 CronjobQueue 设置命名空间并禁用重试。

bundle exec sidekiq 支持命名空间,当在 --queue (-q) 选项或 config/sidekiq_queues.yml:queues: 部分提供命名空间而非简单队列名时,会监听命名空间中的所有队列(技术上:所有以命名空间为前缀的队列)。

将 Worker 添加到现有命名空间需谨慎,因为若未适当调整处理命名空间的 Sidekiq 进程资源,额外任务会占用原有 Worker 的资源。

版本控制

可在每个 Sidekiq Worker 类上指定版本。创建任务时会发送此版本信息。

class FooWorker
  include ApplicationWorker

  version 2

  def perform(*args)
    if job_version == 2
      foo = args.first['foo']
    else
      foo = args.first
    end
  end
end

在此模式下,任何 Worker 都应能处理由该 Worker 旧版本排队的任务。这意味着更改 Worker 参数时,必须递增 version(若参数首次更改则设为 version 1),同时确保 Worker 仍能处理使用任何早期版本参数排队的任务。在 Worker 的 perform 方法中,可通过 self.job_version 读取版本信息,或根据提供的参数数量或类型进行分支处理。

任务大小

GitLab 将 Sidekiq 任务及其参数存储在 Redis 中。为避免内存过度使用,若原始大小超过 100 KB,我们会压缩任务参数。

压缩后若大小仍超过 5 MB,调度任务时会抛出 ExceedLimitError 错误。

若发生此情况,需通过其他方式使数据在 Sidekiq 中可用。可能的解决方案包括:

  • 在 Sidekiq 中重建数据,从数据库或其他位置加载数据
  • 调度任务前将数据存储在 对象存储 中,在任务内部检索

任务权重

部分任务声明了权重。此权重仅在 Sidekiq 默认执行模式下使用——使用 sidekiq-cluster 不考虑权重。

由于我们正 逐步在 Free 版中使用 sidekiq-cluster,新添加的 Worker 无需指定权重,可使用默认权重 1。

任务参数

基于 Sidekiq 推荐的最佳实践,参数应小而简单。

作为 Worker 参数传递的哈希,键应为字符串,值应为原生 JSON 类型。若在 Sidekiq 7.0 及更高版本中未满足此要求,会抛出异常。我们已在开发和测试模式中禁用这些异常仅显示警告,以便升级到此版本。

开发者应确保 Worker 参数的键和值为原生 JSON 类型。

建议为生成 Worker 参数的代码添加测试。例如,自定义 RSpec 匹配器 param_containing_valid_native_json_types(在 SidekiqJSONMatcher 中定义)测试预期为哈希数组的参数:

 it 'passes a valid JSON parameter to MyWorker#perform_async' do
  expect(MyWorker).to receive(:perform_async).with(param_containing_valid_native_json_types)

  method_calling_worker_perform_sync
end

测试

每个 Sidekiq Worker 必须像其他类一样使用 RSpec 测试,测试文件放在 spec/workers 目录下。

与 Sidekiq Redis 和 API 交互

应用应最小化与任何 Sidekiq.redis 和 Sidekiq API 的交互。通用应用逻辑中的此类交互应抽象为 Sidekiq 中间件 以跨团队复用。通过将应用逻辑与 Sidekiq 数据存储解耦,可在水平扩展 GitLab 后台处理设置时获得更大灵活性。

此规则的例外包括迁移相关逻辑或管理操作。

任务执行时长限制

通常 Sidekiq 任务应短时间运行。

虽然任务时长无硬性限制,但长任务有两个特殊注意事项:

  1. 超过我们 紧急程度 阈值的任务时长会负面影响 Sidekiq Apdex 并影响错误预算
  2. 部署会中断长任务。在 GitLab.com 上,部署可能每天发生多次,这会有效限制任务可运行时长

部署对任务时长的影响

部署期间,Sidekiq 会收到 TERM 信号。任务有 25 秒完成时间,之后被中断并强制停止。25 秒宽限期是 Sidekiq 默认值,但可通过 图表配置

若任务被强制停止次数达到一定数量(默认 3 次,可通过 max_retries_after_interruption 配置),它们会被永久终止。此功能通过我们的 sidekiq-reliable-fetch gem 实现。

这实际上将任务可运行时长限制在 max_retries_after_interruption 次部署内,默认为 3 次部署。

处理长任务的建议

与其使用单个大任务,不如拆分为多个小任务。

判断是否需要拆分和并行化 Worker 时,可查看日志中的任务运行时间。若任务持续时间的第 99 百分位数低于基于配置的 紧急程度 目标值,则无需拆分任务。

将长任务拆分为多个小任务时,需考虑下游依赖。例如,若调度数千个任务都需写入主数据库,可能导致主数据库连接争用,使分片上的其他 Sidekiq 任务等待获取连接。为避免此问题,可考虑指定 并发限制