Help us learn about your current experience with the documentation. Take the survey.

在数据库表中去重数据库记录

本指南介绍了一种为现有数据表引入数据库级别唯一性约束(唯一索引)的策略。

要求:

  • 与列相关的属性修改(INSERT, UPDATE)只能通过 ActiveRecord 进行(该技术依赖于 AR 回调)。
  • 重复数据很少见,大多数是由于并发记录创建导致的。可以通过 teleport 检查生产数据库表来验证这一点(请联系数据库管理员寻求帮助)。

总运行时间主要取决于数据库表中的记录数。迁移需要扫描所有记录;为了满足部署后迁移的运行时间限制(约 10 分钟),可以将少于 1000 万行的数据库表视为小表。

小表的去重策略

该策略需要 3 个里程碑。例如,我们将基于 title 列对 issues 表进行去重,其中对于给定的 project_id 列,title 必须是唯一的。

里程碑 1:

  1. 通过迁移后脚本(post-migration)向表中添加一个新的数据库索引(非唯一)(如果尚不存在)。
  2. 添加模型级别的唯一性验证,以减少重复数据的可能性(如果尚不存在)。
  3. 添加一个事务级别的 advisory lock 以防止创建重复记录。

仅第二步本身并不能防止重复记录,更多信息请参阅 Rails guides

创建索引的迁移后脚本:

def up
  add_concurrent_index :issues, [:project_id, :title], name: INDEX_NAME
end

def down
  remove_concurrent_index_by_name :issues, INDEX_NAME
end

Issue 模型的验证和 advisory lock:

class Issue < ApplicationRecord
  validates :title, uniqueness: { scope: :project_id }
  before_validation :prevent_concurrent_inserts

  private

  # 当另一个数据库事务尝试插入相同数据时,此方法将会阻塞。
  # 在其他事务释放锁之后,唯一性验证可能会因记录不唯一的验证错误而失败。

  # 没有这个代码块,唯一性验证将无法检测到重复记录,因为事务之间看不到彼此的更改。
  def prevent_concurrent_inserts
    return if project_id.nil? || title.nil?

    lock_key = ['issues', project_id, title].join('-')
    lock_expression = "hashtext(#{connection.quote(lock_key)})"
    connection.execute("SELECT pg_advisory_xact_lock(#{lock_expression})")
  end
end

里程碑 2:

  1. 在部署后迁移中实现去重逻辑。
  2. 将现有索引替换为唯一索引。

如何解决重复问题(例如,合并属性、保留最新记录)取决于构建在数据库表之上的功能。在本例中,我们保留最新的记录。

def up
  model = define_batchable_model('issues')

  # 对表进行单次遍历
  model.each_batch do |batch|
    # 查找重复的 (project_id, title) 对
    duplicates = model
      .where("(project_id, title) IN (#{batch.select(:project_id, :title).to_sql})")
      .group(:project_id, :title)
      .having('COUNT(*) > 1')
      .pluck(:project_id, :title)

    value_list = Arel::Nodes::ValuesList.new(duplicates).to_sql

    # 通过 (project_id, title) 对定位所有记录,并保留最新的记录。
    # 如果重复数据很少,查找速度应该足够快。
    cleanup_query = <<~SQL
    WITH duplicated_records AS MATERIALIZED (
      SELECT
        id,
        ROW_NUMBER() OVER (PARTITION BY project_id, title ORDER BY project_id, title, id DESC) AS row_number
      FROM issues
      WHERE (project_id, title) IN (#{value_list})
      ORDER BY project_id, title
    )
    DELETE FROM issues
    WHERE id IN (
      SELECT id FROM duplicated_records WHERE row_number > 1
    )
    SQL

    model.connection.execute(cleanup_query)
  end
end

def down
  # no-op
end

这是一个破坏性操作,无法回滚。请确保对去重逻辑进行了彻底的测试。

将旧索引替换为唯一索引:

def up
  add_concurrent_index :issues, [:project_id, :title], name: UNIQUE_INDEX_NAME, unique: true
  remove_concurrent_index_by_name :issues, INDEX_NAME
end

def down
  add_concurrent_index :issues, [:project_id, :title], name: INDEX_NAME
  remove_concurrent_index_by_name :issues, UNIQUE_INDEX_NAME
end

里程碑 3:

  1. 通过移除 prevent_concurrent_inserts ActiveRecord 回调方法来移除 advisory lock。

此里程碑必须在 required stop 之后。

大表的去重策略

在对大表进行去重时,我们可以将批处理和去重逻辑移至 batched background migration

里程碑 1:

  1. 通过迁移后脚本(post migration)向表中添加一个新的数据库索引(非唯一)。
  2. 添加模型级别的唯一性验证,以减少重复数据的可能性(如果尚不存在)。
  3. 添加一个事务级别的 advisory lock 以防止创建重复记录。

里程碑 2:

  1. 在批处理后台迁移中实现去重逻辑,并在部署后迁移中将其排队。

里程碑 3:

  1. 完成批处理后台迁移。
  2. 将现有索引替换为唯一索引。
  3. 通过移除 prevent_concurrent_inserts ActiveRecord 回调方法来移除 advisory lock。

此里程碑必须在 required stop 之后。