Help us learn about your current experience with the documentation. Take the survey.

树形结构中的批量迭代(概念验证)

GitLab 中的群组层级结构以树形表示,其中根元素是顶级命名空间(namespace),子元素是子群组或新引入的 Namespaces::ProjectNamespace 记录。

该树结构通过 namespaces 表中的 parent_id 列实现。该列指向父级命名空间记录,顶级命名空间没有 parent_id

gitlab-org 的部分层级结构示例:

flowchart TD
    A("gitlab-org (9979)") --- B("quality (2750817)")
    B --- C("engineering-productivity (16947798)")
    B --- D("performance-testing (9453799)")
    A --- F("charts (5032027)")
    A --- E("ruby (14018648)")

高效遍历群组层级结构有多种潜在用途,尤其是在后台任务中,这些任务需要对群组层级执行查询,此时稳定安全的执行比快速运行更重要。批量迭代需要更多网络往返,但每个批次的性能特征相似。

几个典型场景:

  • 对每个子群组执行操作。
  • 对层级结构中的每个项目执行操作。
  • 对层级结构中的每个议题(issue)执行操作。

问题陈述

群组层级结构可能变得非常庞大,导致单次查询无法及时加载全部数据,查询会因语句超时(statement timeout)而失败。

解决超大群组相关的可扩展性问题需要我们以不同格式(反规范化)存储相同数据。但如果无法加载群组层级结构,反规范化就无法实现。

一种反规范化技术是存储给定群组的所有后代群组 ID。这将加速需要加载群组及其子群组的查询。例如:

flowchart TD
    A(1) --- B(2)
    A --- C(3)
    C --- D(4)
GROUP_ID DESCENDANT_GROUP_IDS
1 [2,3,4]
2 []
3 [4]
4 []

通过此结构,确定所有子群组只需读取数据库中的一行,而非四行。对于包含 1000 个群组的层级结构,这可能带来巨大性能提升。

这种反规范化解决了层级结构读取问题,但我们仍需找到在表中持久化这些数据的方法。由于群组及其层级可能非常大,无法期望单次查询能完成工作。

SELECT id FROM namespaces WHERE traversal_ids && ARRAY[9970]

上述查询对大型群组可能超时,因此我们需要分批处理数据。

在树形结构中实现批量逻辑是之前未探索过的领域,实现起来相当复杂。基于 EachBatchfind_in_batches 的方案无法工作,因为:

  • 数据(群组 ID)在层级结构中未排序。
  • 子群组中的群组不知道顶级群组 ID。

算法

批量查询通过递归 CTE SQL 查询实现,每个批次最多读取 N 行。由于树形结构,读取 N 行不一定意味着读取了 N 个群组 ID。如果树结构非最优,一个批次可能返回更少(但绝不会更多)的群组 ID。

该查询实现了深度优先树遍历逻辑,数据库扫描树的第一分支直到叶子节点。我们选择深度优先算法,因为当批次完成时,查询必须返回足够信息供下一批次(游标)使用。在 GitLab 中,我们限制树深度为 20,这意味着在最坏情况下,游标最多包含 19 个元素。

实现广度优先树遍历算法不切实际,因为一个群组可能有无限多的后代,导致游标变得巨大。

  1. 创建一个包含以下内容的初始行:
    1. 当前处理的群组 ID(顶级群组 ID)
    2. 两个数组(树深度和收集的 ID)
    3. 用于跟踪查询中行读取次数的计数器
  2. 递归处理该行,根据条件执行以下操作之一:
    • 加载第一个子命名空间,如果不在叶子节点则更新当前处理的命名空间 ID(向下遍历分支)
    • 如果当前深度还有剩余行,则加载下一个命名空间记录
    • 向上跳一个节点,处理更高层级的行
  3. 继续处理直到读取数量达到 LIMIT(批次大小)
  4. 找到最后处理的行(包含游标数据)和所有收集的记录 ID
WITH RECURSIVE result AS (
  (
    SELECT
      9970 AS current_id, /* 当前处理的命名空间 ID */
      ARRAY[9970]::int[] AS depth, /* 游标 */
      ARRAY[9970]::int[] AS ids,  /* 收集的 ID */
      1::bigint AS reads,
      'initialize' AS action
  ) UNION ALL
  (
    WITH cte AS ( /* 用于多次引用 result CTE 的技巧 */
      select * FROM result
    )
    SELECT * FROM (
      (
        SELECT /* 向下遍历分支 */
          namespaces.id,
          cte.depth || namespaces.id,
          cte.ids || namespaces.id,
          cte.reads + 1,
          'walkdown'
        FROM namespaces, cte
        WHERE
        namespaces.parent_id = cte.current_id
        ORDER BY namespaces.id ASC
        LIMIT 1
      ) UNION ALL
      (
        SELECT /* 查找同一层级的下一个元素 */
          namespaces.id,
          cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id,
          cte.ids || namespaces.id,
          cte.reads + 1,
          'next'
        FROM namespaces, cte
        WHERE
        namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND
        namespaces.id > cte.depth[array_length(cte.depth, 1)]
        ORDER BY namespaces.id ASC
        LIMIT 1
      ) UNION ALL
      (
        SELECT /* 完成当前层级时向上跳一个节点 */
          cte.current_id,
          cte.depth[:array_length(cte.depth, 1) - 1],
          cte.ids,
          cte.reads + 1,
          'jump'
        FROM cte
        WHERE cte.depth <> ARRAY[]::int[]
        LIMIT 1
      )
    ) next_row LIMIT 1
  )
)
SELECT current_id, depth, ids, action
FROM result
 current_id |    depth     |          ids           |   action
------------+--------------+------------------------+------------
         24 | {24}         | {24}                   | initialize
         25 | {24,25}      | {24,25}                | walkdown
         26 | {24,26}      | {24,25,26}             | next
        112 | {24,112}     | {24,25,26,112}         | next
        113 | {24,113}     | {24,25,26,112,113}     | next
        114 | {24,113,114} | {24,25,26,112,113,114} | walkdown
        114 | {24,113}     | {24,25,26,112,113,114} | jump
        114 | {24}         | {24,25,26,112,113,114} | jump
        114 | {}           | {24,25,26,112,113,114} | jump

使用此查询查找群组层级中的所有命名空间 ID 可能比其他查询方法(如当前基于 traversal_ids 列的 self_and_descendants 实现)更慢。上述查询仅应在实现群组层级的批量迭代时使用。

Ruby 中的基础批量实现:

class NamespaceEachBatch
  def initialize(namespace_id:, cursor: nil)
    @namespace_id = namespace_id
    @cursor = cursor || { current_id: namespace_id, depth: [namespace_id] }
  end

  def each_batch(of: 500)
    current_cursor = cursor.dup

    first_iteration = true
    loop do
      new_cursor, ids = load_batch(cursor: current_cursor, of: of, first_iteration: first_iteration)
      first_iteration = false
      current_cursor = new_cursor

      yield ids

      break if new_cursor[:depth].empty?
    end
  end

  private

  # 返回命名空间 ID 数组
  def load_batch(cursor:, of:, first_iteration: false)
    recursive_cte = Gitlab::SQL::RecursiveCTE.new(:result,
      union_args: { remove_order: false, remove_duplicates: false })

    ids = first_iteration ? namespace_id.to_s : ""

    recursive_cte << Namespace.select(
      Arel.sql(Integer(cursor.fetch(:current_id)).to_s).as('current_id'),
      Arel.sql("ARRAY[#{cursor.fetch(:depth).join(',')}]::int[]").as('depth'),
      Arel.sql("ARRAY[#{ids}]::int[]").as('ids'),
      Arel.sql("1::bigint AS count")
    ).from('(VALUES (1)) AS does_not_matter').limit(1)

    cte = Gitlab::SQL::CTE.new(:cte, Namespace.select('*').from('result'))

    union_query = Namespace.with(cte.to_arel).from_union(
      walk_down,
      next_elements,
      up_one_level,
      remove_duplicates: false,
      remove_order: false
    ).select('current_id', 'depth', 'ids', 'count').limit(1)

    recursive_cte << union_query

    scope = Namespace.with
      .recursive(recursive_cte.to_arel)
      .from(recursive_cte.alias_to(Namespace.arel_table))
      .limit(of)
    row = Namespace.from(scope.arel.as('namespaces')).order(count: :desc).limit(1).first

    [
      { current_id: row[:current_id], depth: row[:depth] },
      row[:ids]
    ]
  end

  attr_reader :namespace_id, :cursor

  def walk_down
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE parent_id = cte.current_id ORDER BY id LIMIT 1) namespaces')
  end

  def next_elements
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND namespaces.id > cte.depth[array_length(cte.depth, 1)] ORDER BY id LIMIT 1) namespaces')
  end

  def up_one_level
    Namespace.select(
      Arel.sql('cte.current_id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1]').as('depth'),
      Arel.sql('cte.ids').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte')
      .where('cte.depth <> ARRAY[]::int[]')
      .limit(1)
  end
end

iterator = NamespaceEachBatch.new(namespace_id: 9970)
all_ids = []
iterator.each_batch do |ids|
  all_ids.concat(ids)
end

# 测试
puts all_ids.count
puts all_ids.sort == Namespace.where('traversal_ids && ARRAY[9970]').pluck(:id).sort

示例批量查询:

SELECT
    "namespaces".*
FROM ( WITH RECURSIVE "result" AS ((
            SELECT
                15847356 AS current_id,
                ARRAY[9970,
                12061481,
                12128714,
                12445111,
                15847356]::int[] AS depth,
                ARRAY[]::int[] AS ids,
                1::bigint AS count
            FROM (
                VALUES (1)) AS does_not_matter
            LIMIT 1)
    UNION ALL ( WITH "cte" AS MATERIALIZED (
            SELECT
                *
            FROM
                result
)
            SELECT
                current_id,
                depth,
                ids,
                count
            FROM ((
                    SELECT
                        namespaces.id AS current_id,
                        cte.depth || namespaces.id AS depth,
                        cte.ids || namespaces.id AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte,
                        LATERAL (
                            SELECT
                                id
                            FROM
                                namespaces
                            WHERE
                                parent_id = cte.current_id
                            ORDER BY
                                id
                            LIMIT 1
) namespaces
)
                UNION ALL (
                    SELECT
                        namespaces.id AS current_id,
                        cte.depth[:array_length(
                            cte.depth, 1
) - 1] || namespaces.id AS depth,
                        cte.ids || namespaces.id AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte,
                        LATERAL (
                            SELECT
                                id
                            FROM
                                namespaces
                            WHERE
                                namespaces.parent_id = cte.depth[array_length(
                                    cte.depth, 1
) - 1]
                                AND namespaces.id > cte.depth[array_length(
                                    cte.depth, 1
)]
                            ORDER BY
                                id
                            LIMIT 1
) namespaces
)
                UNION ALL (
                    SELECT
                        cte.current_id AS current_id,
                        cte.depth[:array_length(
                            cte.depth, 1
) - 1] AS depth,
                        cte.ids AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte
                    WHERE (
                        cte.depth <> ARRAY[]::int[]
)
                LIMIT 1
)
) namespaces
    LIMIT 1
))
SELECT
    "namespaces".*
FROM
    "result" AS "namespaces"
LIMIT 500) namespaces
ORDER BY
    "count" DESC
LIMIT 1

执行计划:

 Limit  (cost=16.36..16.36 rows=1 width=76) (actual time=436.963..436.970 rows=1 loops=1)
   Buffers: shared hit=3721 read=423 dirtied=8
   I/O Timings: read=412.590 write=0.000
   ->  Sort  (cost=16.36..16.39 rows=11 width=76) (actual time=436.961..436.968 rows=1 loops=1)
         Sort Key: namespaces.count DESC
         Sort Method: top-N heapsort  Memory: 27kB
         Buffers: shared hit=3721 read=423 dirtied=8
         I/O Timings: read=412.590 write=0.000
         ->  Limit  (cost=15.98..16.20 rows=11 width=76) (actual time=0.005..436.394 rows=500 loops=1)
               Buffers: shared hit=3718 read=423 dirtied=8
               I/O Timings: read=412.590 write=0.000
               CTE result
                 ->  Recursive Union  (cost=0.00..15.98 rows=11 width=76) (actual time=0.003..432.924 rows=500 loops=1)
                       Buffers: shared hit=3718 read=423 dirtied=8
                       I/O Timings: read=412.590 write=0.000
                       ->  Limit  (cost=0.00..0.01 rows=1 width=76) (actual time=0.002..0.003 rows=1 loops=1)
                             I/O Timings: read=0.000 write=0.000
                             ->  Result  (cost=0.00..0.01 rows=1 width=76) (actual time=0.001..0.002 rows=1 loops=1)
                                   I/O Timings: read=0.000 write=0.000
                       ->  Limit  (cost=0.76..1.57 rows=1 width=76) (actual time=0.862..0.862 rows=1 loops=499)
                             Buffers: shared hit=3718 read=423 dirtied=8
                             I/O Timings: read=412.590 write=0.000
                             CTE cte
                               ->  WorkTable Scan on result  (cost=0.00..0.20 rows=10 width=76) (actual time=0.000..0.000 rows=1 loops=499)
                                     I/O Timings: read=0.000 write=0.000
                             ->  Append  (cost=0.56..17.57 rows=21 width=76) (actual time=0.862..0.862 rows=1 loops=499)
                                   Buffers: shared hit=3718 read=423 dirtied=8
                                   I/O Timings: read=412.590 write=0.000
                                   ->  Nested Loop  (cost=0.56..7.77 rows=10 width=76) (actual time=0.675..0.675 rows=0 loops=499)
                                         Buffers: shared hit=1693 read=357 dirtied=1
                                         I/O Timings: read=327.812 write=0.000
                                         ->  CTE Scan on cte  (cost=0.00..0.20 rows=10 width=76) (actual time=0.001..0.001 rows=1 loops=499)
                                               I/O Timings: read=0.000 write=0.000
                                         ->  Limit  (cost=0.56..0.73 rows=1 width=4) (actual time=0.672..0.672 rows=0 loops=499)
                                               Buffers: shared hit=1693 read=357 dirtied=1
                                               I/O Timings: read=327.812 write=0.000
                                               ->  Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_1  (cost=0.56..5.33 rows=29 width=4) (actual time=0.671..0.671 rows=0 loops=499)
                                                     Index Cond: (namespaces_1.parent_id = cte.current_id)
                                                     Heap Fetches: 7
                                                     Buffers: shared hit=1693 read=357 dirtied=1
                                                     I/O Timings: read=327.812 write=0.000
                                   ->  Nested Loop  (cost=0.57..9.45 rows=10 width=76) (actual time=0.208..0.208 rows=1 loops=442)
                                         Buffers: shared hit=2025 read=66 dirtied=7
                                         I/O Timings: read=84.778 write=0.000
                                         ->  CTE Scan on cte cte_1  (cost=0.00..0.20 rows=10 width=72) (actual time=0.000..0.000 rows=1 loops=442)
                                               I/O Timings: read=0.000 write=0.000
                                         ->  Limit  (cost=0.57..0.89 rows=1 width=4) (actual time=0.203..0.203 rows=1 loops=442)
                                               Buffers: shared hit=2025 read=66 dirtied=7
                                               I/O Timings: read=84.778 write=0.000
                                               ->  Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_2  (cost=0.57..3.77 rows=10 width=4) (actual time=0.201..0.201 rows=1 loops=442)
                                                     Index Cond: ((namespaces_2.parent_id = (cte_1.depth)[(array_length(cte_1.depth, 1) - 1)]) AND (namespaces_2.id > (cte_1.depth)[array_length(cte_1.depth, 1)]))
                                                     Heap Fetches: 35
                                                     Buffers: shared hit=2025 read=66 dirtied=6
                                                     I/O Timings: read=84.778 write=0.000
                                   ->  Limit  (cost=0.00..0.03 rows=1 width=76) (actual time=0.003..0.003 rows=1 loops=59)
                                         I/O Timings: read=0.000 write=0.000
                                         ->  CTE Scan on cte cte_2  (cost=0.00..0.29 rows=9 width=76) (actual time=0.002..0.002 rows=1 loops=59)
                                               Filter: (cte_2.depth <> '{}'::integer[])
                                               Rows Removed by Filter: 0
                                               I/O Timings: read=0.000 write=0.000
               ->  CTE Scan on result namespaces  (cost=0.00..0.22 rows=11 width=76) (actual time=0.005..436.240 rows=500 loops=1)
                     Buffers: shared hit=3718 read=423 dirtied=8
                     I/O Timings: read=412.590 write=0.000