Help us learn about your current experience with the documentation. Take the survey.

批处理最佳实践

本文档概述了我们在 GitLab 中使用的可用批处理策略。我们列出了每种策略的优缺点,以便工程师能够为他们的用例选择理想的方法。

为什么需要批处理

在处理大量记录时,通过单个数据库查询读取、更新或删除记录可能会很困难;操作很容易超时。为了避免这个问题,我们应该分批处理记录。批处理通常在后台作业中进行,因为运行时限制比 Web 请求期间更宽松。

在后台作业中使用批处理,而不是在 Web 请求中

在少数情况下(旧功能),批处理也会在 Web 请求中进行。但是,对于新功能,由于 Web 请求超时时间较短(默认为 60 秒),不建议这样做。作为指导原则,当需要处理大量记录的功能时,应优先考虑使用后台作业(Sidekiq workers)。

性能考虑

批处理性能与分页性能密切相关,因为底层库和数据库查询本质上是相同的。在实现批处理时,熟悉 分页性能指南 和我们 批处理工具 相关的文档非常重要。

后台作业中的批处理

在后台作业中实现批处理时,需要考虑两个主要方面:总运行时间和数据修改量。

后台作业不应运行很长时间。Sidekiq 进程可能会崩溃或被强制停止(例如,在重启或部署时)。此外,根据我们的 错误预算 规则,运行 5 分钟后,错误预算违规将被添加到注册该功能的组中。在后台作业中实现批处理时,请确保您熟悉我们关于 幂等作业 的指导原则。

更新或删除大量记录会增加数据库复制延迟,并给主数据库带来额外压力。建议限制我们在后台作业中处理(或批处理)的记录总数。

为解决上述潜在问题,应考虑以下措施:

  • 限制作业的总运行时间。
  • 限制记录修改。
  • 批处理之间的休息时间。(几毫秒)

应用限制时,重要的是要提到长时间运行的后台作业应实现"稍后继续"机制,即在达到限制后安排一个新作业继续批处理停止时的工作。这在作业时间过长,很可能无法在 5 分钟运行时间内完成的情况下非常重要。

使用 Gitlab::Metrics::RuntimeLimiter 类实现运行时间限制的示例:

def perform(project_id)
  runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(3.minutes)

  project = Project.find(1)
  project.issues.each_batch(of: :iid) do |scope|
    scope.update_all(updated_at: Time.current)
    break if runtime_limiter.over_time?
  end
end

当代码片段中的批处理运行时间达到 3 分钟时停止。现在的问题是,我们无法继续处理。要做到这一点,我们需要安排一个新的后台作业,并提供足够的信息来继续处理。在片段中,我们按 iid 列对项目中的问题进行批处理。对于下一个作业,我们需要提供项目 ID 和最后处理的 iid 值。我们通常将这些信息称为游标。

def perform(project_id, iid = nil)
  runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(3.minutes)

  project = Project.find(project_id)
  # 如果存在,恢复之前的 iid
  project.issues.where('iid > ?', iid || 0).each_batch(of: :iid) do |scope|
    max_iid = scope.maximum(:iid)
    scope.update_all(updated_at: Time.current)

    if runtime_limiter.over_time?
      MyJob.perform_in(2.minutes, project_id, iid)

      break
    end
  end
end

实现"稍后继续"机制可能会给实现增加显著的复杂性。因此,在承诺这项工作之前,请分析生产数据库中的现有数据并尝试推断数据增长。几个示例:

  • 将给定用户的所有 pending 待办事项标记为 done 不需要"稍后继续"机制。
    • 推理:即使是最繁忙的用户,待办事项的数量很可能不会超过几千个数据库行。更新这些行在 99.9% 的情况下会在 1 分钟内完成。
  • 在给定项目中将 CI 构建记录存储在 CSV 文件中可能需要"稍后继续"机制。
    • 推理:对于非常活跃的项目,CI 作业数量可以以极高的速度增长到数百万行。

当后台作业中发生大量更新时,建议(不是严格要求)在代码中添加一些休眠时间,并限制我们更新的记录总数。这减少了主数据库的压力,并为潜在的数据库迁移获取更重的锁提供了小窗口。

def perform(project_id, iid = nil)
  max_updates = 100_000 # 允许最多 N 次更新
  updates = 0
  status = :completed
  runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(3.minutes)

  project = Project.find(project_id)
  project.issues.where('iid > ?', iid || 0).each_batch(of: :iid) do |scope|
    max_iid = scope.maximum(:iid)
    updates += scope.update_all(updated_at: Time.current)

    if runtime_limiter.over_time? || updates >= max_updates
      MyJob.perform_in(2.minutes, project_id, iid)
      status = :limit_reached

      break
    end

    # 当我们预期长时间运行且修改大量数据的批处理时添加休眠
    sleep 0.01
  end
end

可追溯性

为了可追溯性,暴露指标以便我们可以在 Kibana 中查看批处理的执行情况是个好习惯:

log_extra_metadata_on_done(:result, {
  status: :limit_reached, # 或 :completed
  updated_rows: updates
})

下一个作业的调度

上面示例中调度下一个作业不是崩溃安全的(作业可能会丢失),对于非常重要的任务,这种方法不适合。一种安全和常见的模式是使用计划工作器,它根据游标执行工作。游标可以持久化在数据库或 Redis 中,具体取决于一致性要求。这意味着游标不再通过作业参数传递。

计划工作器的频率可以根据任务的紧急程度进行调整。我们有示例,计划工作器每分钟入队一次以处理紧急项目。

基于 Redis 的游标

示例:处理项目中的所有问题。

def perform
  project_id, iid = load_cursor # 从 Redis 加载游标

  return unless project_id # 没有入队任何内容

  project = Project.find(project_id)
  project.issues.where('iid > ?', iid || 0).each_batch(of: :iid) do |scope|
    # 对问题执行某些操作。
    # 在这里中断,如果时间限制到了则设置中断标志。
    # 将 iid 设置为最后处理的值。
  end

  # 稍后继续工作
  push_cursor(project_id, iid) if interrupted?
end

private

def load_cursor
  # 取 1 个元素,不是崩溃安全的。
  raw_cursor = Gitlab::Redis::SharedState.with do |redis|
    redis.lpop('my_cursor')
  end

  return unless raw_cursor

  cursor = Gitlab::Json.parse(raw_cursor)
  [cursor['project_id'], cursor['iid']]
end

def push_cursor(project_id, iid)
  # 工作未完成,将游标放在列表开头,以便下一个作业可以拾取它。
  Gitlab::Redis::SharedState.with do |redis|
    redis.lpush('my_cursor', Gitlab::Json.dump({ project_id: project_id, iid: iid }))
  end
end

在应用程序代码中,您可以在数据库事务提交后将项目放入队列(有关更多详细信息,请参阅 事务指南):

def execute
  ApplicationRecord.transaction do
    user.save!
    Event.create!(user: user, issue: issue)
  end

  # 应用程序可能在这里崩溃

  MyRedieQueue.add(user: user, issue: issue)
end

这种方法不是崩溃安全的,如果应用程序在事务提交后立即崩溃,项目将不会被入队。

优点:

  • 更容易实现,不需要额外的数据库表来跟踪作业。
  • 适用于低吞吐量、内部调用的作业。(示例:全表定期一致性检查、后台聚合)

缺点:

  • 调度工作(将游标放入队列)不是崩溃安全的。
  • 当游标被读取时可能出现序列化问题(多版本兼容性)。
  • 需要特别注意数据库事务。

基于 PostgreSQL 的游标

另一种方法是将队列存储在 PostgreSQL 数据库中。在这种情况下,我们可以实现 事务性出站模式,这在应用程序(Web 或工作器)崩溃时确保一致性。

优点:

  • 调度工作可以与其他记录更改完全一致(示例:在问题创建事务中调度工作)。
  • 可以容纳队列中的大量项目。

缺点:

  • 根据卷量,实现可能相当复杂:
    • 分区数据库表:这应考虑用于高吞吐量工作器。
    • 考虑 滑动窗口分区策略
    • 复杂的跨分区查询。

示例:设置可靠的发送邮件方式

# 在服务中
def execute
  ApplicationRecord.transaction do
    user.save!
    Event.create!(user: user, issue: issue)
    IssueEmailWorkerQueue.insert!(user: user, issue: issue)
  end
end

IssueEmailWorkerQueue 记录存储执行作业所需的所有信息。在计划的后台作业中,我们可以按特定顺序处理表。

def perform
  runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(3.minutes)
  items = EmailWorkerQueue.order(:id).take(25)

  items.each do |item|
    # 对项目执行某些操作
  end
end

为了避免记录的并行处理,您可能需要用分布式 Redis 锁包装执行。

Redis 锁使用示例:

class MyJob
  include ApplicationWorker
  include Gitlab::ExclusiveLeaseHelpers

  MAX_TTL = 2.5.minutes.to_i # 应与运行时间限制相似。

  def perform
    in_lock('my_lock_key', ttl: MAX_TTL, retries: 0) do
      # 在这里执行工作。
    end
  end
end

Sidekiq 作业的考虑因素

Sidekiq 作业可能会消耗大量数据库资源。如果您的作业仅批处理数据而不修改数据库中的任何内容,请考虑设置有利于数据库副本的属性。请参阅 Sidekiq 工作器属性 文档。

批处理策略

为了使示例易于理解,我们省略了限制运行时的代码。

一些示例包含对 cursor 变量的可选变量赋值。这是可选步骤,可用于实现"稍后继续"机制。

基于循环的批处理

该策略利用了在数据库中更新或删除记录后,完全相同的查询将返回不同记录这一事实。此策略仅在我们想要删除或更新某些记录时使用。

示例:

loop do
  # 需要 project_id 上的索引
  delete_count = project.issues.limit(1000).delete_all
  break if delete_count == 0 # 当没有要删除的记录时退出循环
end

优点:

  • 实现简单,不需要维护游标。
  • 单列数据库索引足以实现批处理,这通常是可用的(外键)。
  • 如果顺序不重要,只要索引覆盖,也可以使用复杂的过滤条件。

缺点:

  • 由于重复扫描过时的索引条目和可见性检查的 负面副作用,后续循环中的查询性能会下降。因此,此策略仅适用于影响相对少量数据的短期操作。安全限制通常最多为 10k 行,但这可能因表大小和索引结构等因素而异。
  • 必须彻底测试并手动验证底层 DELETEUPDATE 查询。在更新或删除记录时,CTE 存在一些问题。
  • 如果 break 逻辑有错误,我们可能会陷入无限循环。

可以使基于循环的方法按特定顺序处理记录:

loop do
  # 需要 (project_id, created_at) 上的复合索引
  delete_count = project.issues.limit(1000).order(created_at: :desc).delete_all
  break if delete_count == 0
end

使用前面示例中提到的索引,我们也可以使用 timestamp 条件:

loop do
  # 需要 (project_id, created_at) 上的复合索引
  delete_count = project
    .issues
    .where('created_at < ?', 1.month.ago)
    .limit(1000)
    .order(created_at: :desc)
    .delete_all

  break if delete_count == 0
end

单列批处理

我们可以使用单个唯一列(主键或具有唯一索引的列)与 EachBatch 模块进行批处理。这是 GitLab 中最常用的批处理策略之一。

# 需要 (project_id, id) 上的复合索引。
# EachBatch 默认使用主键进行批处理。
cursor = nil
project.issues.where('id > ?', cursor || 0).each_batch do |batch|
  issues = batch.to_a
  cursor = issues.last.id # 用于下一个作业

  # 对问题记录执行某些操作
end

优点:

  • GitLab 应用程序中最流行的批处理方式。
  • 实现简单,涵盖广泛的用例。

缺点:

  • ORDER BY 列(ID)在查询上下文中必须是唯一的。
  • 当存在 timestamp 列条件或其他复杂条件(INNOT EXISTS)时,效率不高。

基于不同值的批处理

EachBatch 需要一个唯一的数据库列(通常是 ID 列),但在极少数情况下,功能需要基于非唯一列进行批处理。示例:将所有项目 timestamp 值递增,这些项目至少有一个问题。

一种方法是对"父"表进行批处理,在这种情况下使用 Project 模型。

cursor = nil
# 使用主键索引
Project.where('id > ?', cursor || 0).each_batch do |batch|
  cursor = batch.maximum(:id) # 用于下一个作业

  project_ids = batch
    .where('EXISTS (SELECT 1 FROM issues WHERE projects.id=issues.project_id)')
    .pluck(:id)

  Project.where(id: project_ids).update_all(update_all: Time.current)
end

优点:

  • 当列是外键时,批处理父表的主键应该已经有索引覆盖。

缺点:

  • 当块内的额外条件只匹配少量行时,可能会很浪费。

批处理查询对 projects 表运行全表扫描,这可能是浪费的,或者我们可以使用 distinct_each_batch 辅助方法:

# 需要 project_id 上的索引
Issue.distinct_each_batch(column: :project_id) do |scope|
  project_ids = scope.pluck(:project_id)
  cursor = project_ids.last # 用于下一个作业

  Project.where(id: project_ids).update_all(update_all: Time.current)
end

优点:

  • 当列是外键列时,索引已经可用。
  • 它可以显著减少批处理逻辑需要扫描的数据量。

缺点:

  • 使用有限,不广泛使用。

基于 Keyset 的批处理

基于 Keyset 的批处理允许您按特定顺序迭代记录,多列排序也是可能的。最常见的用例是当我们需要通过 timestamp 列处理有序数据时。

示例:删除一年以上的问题记录。

def perform
  cursor = load_cursor || {}
  # 需要 (created_at, id) 列上的复合索引
  scope = Issue.where('created_at > ?', 1.year.ago).order(:created_at, :id)

  iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: scope, cursor: cursor)

  iterator.each_batch(of: 100) do |records|
    loaded_records = records.to_a

    loaded_records.each { |record| record.destroy } # 调用 destroy 以便触发回调
  end

  cursor = iterator.send(:cursor) # 在此步骤后存储游标,用于下一个作业
end

使用基于 Keyset 的批处理,您可以调整 ORDER BY 子句以匹配现有索引的列配置。考虑以下索引:

CREATE INDEX issues_search_index ON issues (project_id, state, created_at, id)

由于 ORDER BY 列列表与索引定义中的列列表不完全匹配,因此上述片段无法使用此索引。但是,如果我们更改 ORDER BY 子句,查询规划器将选择该索引:

# 注意:这是不同的排序顺序,但至少我们可以使用现有索引
scope = Issue.where('created_at > ?', 1.year.ago).order(:project_id, :state, :created_at, :id)

优点:

  • 多列排序顺序和更复杂的过滤是可能的。
  • 您可能能够重用现有索引而无需引入新索引。

缺点:

  • 游标大小可能更大(每个 ORDER BY 列都将存储在游标中)。

偏移量批处理

此批处理技术在加载新记录时使用 偏移量分页。偏移量分页应仅作为最后手段使用,当给定查询无法通过 EachBatch 或 keyset 分页进行分页时。选择此技术的一个原因是,没有合适的索引可用于 SQL 查询使用不同的批处理技术。示例:在后台作业中加载太多记录而没有限制,并且开始超时。记录的顺序很重要。

def perform(project_id)
  # 我们有 (project_id, created_at) 列上的复合索引
  issues = Issue
    .where(project_id: project_id)
    .order(:created_at)
    .to_a

  # 对问题执行某些操作
end

随着项目内问题数量的增长,查询变慢并最终超时。使用不同的批处理技术(如 keyset 分页)是不可能的,因为 ORDER BY 子句依赖于不唯一的 timestamp 列(请参阅 决胜列 部分)。理想情况下,我们应该按 created_at, id 列排序,但是我们没有该索引可用。在时间敏感的场景中(例如事件),立即引入新索引可能不可行,因此作为最后手段,我们可以尝试偏移量分页。

def perform(project_id)
  page = 1

  loop do
    issues = Issue.where(project_id: project_id).order(:created_at).page(page).to_a
    page +=1
    break if issues.empty?

    # 对问题执行某些操作
  end
end

上面的片段可以作为临时修复,直到适当的解决方案到位。偏移量分页随着页码的增加而变慢,这意味着偏移量分页查询可能会像原始查询一样超时。数据库缓冲区缓存在一定程度上减少了这种可能性,它将先前加载的记录保存在内存中;因此,相同行的连续(短期)查找对性能不会有很大影响。

优点:

  • 实现简单。

缺点:

  • 性能随着页码的增加而线性下降。
  • 这只是权宜之计,不应用于新功能。
  • 您可以将页码存储为游标,但从前一点恢复处理可能不可靠。

在组层次结构中的批处理

我们有几个功能需要在顶级命名空间及其子组中查询数据。存在异常的组层次结构,包含数千个子组或项目。查询此类层次结构很容易在添加额外的子查询或连接时导致数据库语句超时。

示例:迭代组中的问题

group = Group.find(9970)

Issue.where(project_id: group.all_project_ids).each_batch do |scope|
  # 对问题执行某些操作
end
``

上面的示例将加载所有子组、所有项目和组层次结构中的所有问题,这很可能导致数据库语句超时。上述查询可以通过数据库索引作为短期解决方案稍作改进。

### 使用 in 运算符优化

当您需要按特定顺序处理组中的记录时,可以使用 [in 运算符优化](efficient_in_operator_queries.md),它比使用标准的基于 `each_batch` 的批处理策略提供更好的性能。

您可以在 [此处](efficient_in_operator_queries.md#batch-iteration) 查看在组层次结构中批处理记录的示例。

优点:

- 这是按特定顺序在组层次结构中高效批处理记录的唯一方法。

缺点:

- 需要更复杂的设置。
- 批处理非常大的层次结构(大量项目或子组)将需要更小的批处理大小。

### 始终从顶级组开始批处理

当您总是必须从顶级组(没有父组的组)开始批处理时,可以使用此技术。在这种情况下,我们可以利用 `namespaces` 表中的以下索引:

```sql
"index_on_namespaces_namespaces_by_top_level_namespace" btree ((traversal_ids[1]), type, id) -- traversal_ids[1] 是顶级组 id

批处理查询示例:

Namespace.where('traversal_ids[1] = ?', 9970).where(type: 'Project').each_batch do |project_namespaces|
  project_ids = Project.where(project_namespace_id: project_namespaces.select(:id)).pluck(:id)
  cursor = project_namespaces.last.id # 用于下一个作业

  project_ids.each do |project_id|
    Issue.where(project_id: project_id).each_batch(column: :iid) do |issues|
      # 对问题执行某些操作
    end
  end
end

优点:

  • 可以避免加载整个组层次结构。
  • 使用嵌套的 EachBatch 处理均匀分布的批处理。

缺点:

  • 由于双重批处理导致更多数据库查询。

从组层次结构中的任何节点开始批处理

使用 NamespaceEachBatch 类允许我们批处理组层次结构(树)的特定分支。

# current_id: 开始迭代的命名空间记录的 id
# depth: 树的深度,迭代之前停止。最初,它应与 current_id 相同
cursor = { current_id: 9970, depth: [9970] } # 这可以是任何命名空间 id
iterator = Gitlab::Database::NamespaceEachBatch.new(namespace_class: Namespace, cursor: cursor)

# 需要 (parent_id, id) 列上的复合索引
iterator.each_batch(of: 100) do |ids, new_cursor|
  namespace_ids = Namespaces::ProjectNamespace.where(id: ids)
  cursor = new_cursor # 用于下一个作业,包含新的 current_id 和 depth 值

  project_ids = Project.where(project_namespace_id: namespace_ids)
  project_ids.each do |project_id|
    Issue.where(project_id: project_id).each_batch(column: :iid) do |issues|
      # 对问题执行某些操作
    end
  end
end

优点:

  • 它可以从任何节点处理组层次结构。

缺点:

  • 很少使用,仅在非常罕见的用例中有用。

复杂查询的批处理

我们将复杂查询视为包含多个过滤器和连接的查询。大多数时候,这些查询不容易批处理。几个示例: