Parallel Query Execution
The Parallel Query Execution feature optimizes the execution of large queries accessing tables with many rows by parallelizing their processing.
When enabled, Optimizer v3
identifies opportunities to parallelize query execution.
The optimizer begins by analyzing accesses to large tables that can be fragmented and dynamically generates both a parallel and single-threaded query plan.
The optimizer then selects the plan with the lowest cost, which may result in parallelizing the entire query or only parts of it.
The query is logically partitioned and then executed in parallel by workers, which produce intermediate results. These results are then aggregated by the main thread, also referred to as the query coordinator. Queries can be processed using multiple workers on a single Transaction Engine (TE).
Configuration
Parallel query execution is enabled by default. Parallel query execution may be configured during TE startup in two ways:
-
The degree of parallelization can be adjusted by changing the
parallel-execution-max-threads
database option. A value of 1 will disable parallel execution. -
The minimum number of rows estimated to be scanned before we will consider parallel execution can be adjusted by changing the
parallel-execution-min-rows
database option. Increasing the value will reduce the usage of parallel execution.
We recommend using the default values. If the configuration is changed, it is recommended to apply the same configuration to all TEs.
Example
Use the EXPLAIN statement to check whether a query will use parallel execution.
Alternatively, query the SYSTEM.LASTSTATEMENT
table and inspect the ISPARALLELQUERY
column to verify if the last executed query used parallel execution.
The SYSTEM.LASTPARALLELSTATEMENT
table provides statistics for the last parallel statement executed by the current client.
For more information, see LASTPARALLELSTATEMENT System Table Description and LASTSTATEMENT System Table Description
Example of a parallel query plan:
select
l_orderkey,
sum(
l_extendedprice * (1 - l_discount)
) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'
and l_shipdate > '1995-03-15'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate;
L_ORDERKEY REVENUE O_ORDERDATE O_SHIPPRIORITY
----------- ----------- ------------ ---------------
1637 164224.9253 1995-02-08 0
5191 49378.3094 1994-12-11 0
742 43728.0480 1994-12-23 0
3492 43716.0724 1994-11-24 0
2883 36666.9612 1995-01-23 0
998 11785.5486 1994-11-26 0
3430 4726.6775 1994-12-12 0
4423 3055.9365 1995-02-17 0
explain
select
l_orderkey,
sum(
l_extendedprice * (1 - l_discount)
) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'
and l_shipdate > '1995-03-15'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate;
Optimized using v3 optimizer:
Project LINEITEM.L_ORDERKEY, REVENUE, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY
Sort REVENUE DESC ORDERS.O_ORDERDATE ASC (cost: 8207.1, est. rows: 1802)
Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM(SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))) (cost: 6405.6, est. rows: 1802)
Parallelize Query (4 workers) (cost: 4604.1, est. rows: 1802)
Parallel workers:
Worker id 0 on node 2, logically partitioned on table LINEITEM
Explain output:
Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
Hash
Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
Table scan ORDERS (cost: 4500.5, est. rows: 1500)
Hash
Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
Table scan CUSTOMER (cost: 450.5, est. rows: 150)
Worker id 1 on node 2, logically partitioned on table LINEITEM
Explain output:
Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
Hash
Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
Table scan ORDERS (cost: 4500.5, est. rows: 1500)
Hash
Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
Table scan CUSTOMER (cost: 450.5, est. rows: 150)
Worker id 2 on node 2, logically partitioned on table LINEITEM
Explain output:
Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
Hash
Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
Table scan ORDERS (cost: 4500.5, est. rows: 1500)
Hash
Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
Table scan CUSTOMER (cost: 450.5, est. rows: 150)
Worker id 3 on node 2, logically partitioned on table LINEITEM
Explain output:
Project ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY, LINEITEM.L_ORDERKEY, SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT)))
Hash grouping keys: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY aggregates: SUM((LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))) (cost: 10217.8, est. rows: 141)
Evaluation (LINEITEM.L_EXTENDEDPRICE * (1 - LINEITEM.L_DISCOUNT))
Hash join on (ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY) (cost: 10077.1, est. rows: 141)
Filter [batch] (LINEITEM.L_SHIPDATE > '1995-03-15') (est. rows: 450)
Table scan LINEITEM (cost: 4505.3, est. rows: 1501)
Hash
Hash join on (CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY) (cost: 5206.0, est. rows: 141)
Filter [batch] (ORDERS.O_ORDERDATE < '1995-03-15') (est. rows: 450)
Table scan ORDERS (cost: 4500.5, est. rows: 1500)
Hash
Filter (CUSTOMER.C_MKTSEGMENT = 'BUILDING') (est. rows: 30)
Table scan CUSTOMER (cost: 450.5, est. rows: 150)
VectorPushPullExchanger using 4 pushers, 40 vector groups