Parallel Query Execution

The Parallel Query Execution feature optimizes the execution of large queries accessing tables with many rows by parallelizing their processing.

When enabled, Optimizer v3 identifies opportunities to parallelize query execution. The optimizer begins by analyzing accesses to large tables that can be fragmented and dynamically generates both a parallel and single-threaded query plan. The optimizer then selects the plan with the lowest cost, which may result in parallelizing the entire query or only parts of it.

The query is logically partitioned and then executed in parallel by workers, which produce intermediate results. These results are then aggregated by the main thread, also referred to as the query coordinator. Queries can be processed using multiple workers on a single Transaction Engine (TE).

Configuration

Parallel query execution is enabled by default. Parallel query execution may be configured during TE startup in two ways:

  1. The degree of parallelization can be adjusted by changing the parallel-execution-max-threads database option. A value of 1 will disable parallel execution.

  2. The minimum number of rows estimated to be scanned before we will consider parallel execution can be adjusted by changing the parallel-execution-min-rows database option. Increasing the value will reduce the usage of parallel execution.

We recommend using the default values. If the configuration is changed, it is recommended to apply the same configuration to all TEs.

Example

Use the EXPLAIN statement to check whether a query will use parallel execution. Alternatively, query the SYSTEM.LASTSTATEMENT table and inspect the ISPARALLELQUERY column to verify if the last executed query used parallel execution. The SYSTEM.LASTPARALLELSTATEMENT table provides statistics for the last parallel statement executed by the current client. For more information, see LASTPARALLELSTATEMENT System Table Description and LASTSTATEMENT System Table Description

Example of a parallel query plan:

select
  l_orderkey,
  sum(
    l_extendedprice * (1 - l_discount)
  ) as revenue,
  o_orderdate,
  o_shippriority
from
  customer,
  orders,
  lineitem
where
  c_mktsegment = 'BUILDING'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-15'
  and l_shipdate > '1995-03-15'
group by
  l_orderkey,
  o_orderdate,
  o_shippriority
order by
  revenue desc,
  o_orderdate;
 L_ORDERKEY    REVENUE   O_ORDERDATE  O_SHIPPRIORITY
 ----------- ----------- ------------ ---------------

    1637     164224.9253  1995-02-08         0
    5191      49378.3094  1994-12-11         0
     742      43728.0480  1994-12-23         0
    3492      43716.0724  1994-11-24         0
    2883      36666.9612  1995-01-23         0
     998      11785.5486  1994-11-26         0
    3430       4726.6775  1994-12-12         0
    4423       3055.9365  1995-02-17         0
explain
select
  l_orderkey,
  sum(
    l_extendedprice * (1 - l_discount)
  ) as revenue,
  o_orderdate,
  o_shippriority
from
  customer,
  orders,
  lineitem
where
  c_mktsegment = 'BUILDING'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-15'
  and l_shipdate > '1995-03-15'
group by
  l_orderkey,
  o_orderdate,
  o_shippriority
order by
  revenue desc,
  o_orderdate;

Optimized using v3 optimizer:
Project LINEITEM.L_ORDERKEY, REVENUE, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY
  Sort REVENUE DESC ORDERS.O_ORDERDATE ASC (cost: 8207.1, est. rows: 1802)
    Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM(SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))) (cost: 6405.6, est. rows: 1802)
      Parallelize Query (4 workers) (cost: 4604.1, est. rows: 1802)

        Parallel workers:

            Worker id 0 on node 2, logically partitioned on table LINEITEM
            Explain output:
              Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
                Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
                  Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
                    Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
                      Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
                        Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
                      Hash
                        Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
                          Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
                            Table scan ORDERS (cost: 4500.5, est. rows: 1500)
                          Hash
                            Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
                              Table scan CUSTOMER (cost: 450.5, est. rows: 150)

            Worker id 1 on node 2, logically partitioned on table LINEITEM
            Explain output:
              Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
                Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
                  Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
                    Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
                      Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
                        Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
                      Hash
                        Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
                          Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
                            Table scan ORDERS (cost: 4500.5, est. rows: 1500)
                          Hash
                            Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
                              Table scan CUSTOMER (cost: 450.5, est. rows: 150)

            Worker id 2 on node 2, logically partitioned on table LINEITEM
            Explain output:
              Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
                Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
                  Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
                    Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
                      Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
                        Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
                      Hash
                        Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
                          Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
                            Table scan ORDERS (cost: 4500.5, est. rows: 1500)
                          Hash
                            Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
                              Table scan CUSTOMER (cost: 450.5, est. rows: 150)

            Worker id 3 on node 2, logically partitioned on table LINEITEM
            Explain output:
              Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
                Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
                  Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
                    Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
                      Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
                        Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
                      Hash
                        Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
                          Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
                            Table scan ORDERS (cost: 4500.5, est. rows: 1500)
                          Hash
                            Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
                              Table scan CUSTOMER (cost: 450.5, est. rows: 150)

        VectorPushPullExchanger using 4 pushers, 40 vector groups