Today, worker URL assignation is random, and users have no say over which URL is assigned to each worker.
How it works today
The process since there's a valid single-node plan until there's a distributed plan with worker URLs assigned is the following:
- Users tell the distributed planner what is an appropriate amount of tasks to execute each node (typically only done for leaf nodes)
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┌──────────────────┐
│ Aggregate(final) │
└──────────────────┘
┌──────────────────┐
│ Repartition │
└──────────────────┘
┌──────────────────┐
│Aggregate(partial)│
└──────────────────┘
┌──────────────────┐
│ Filter │
└──────────────────┘
┌──────────────────┐
│ Union │
└──────────────────┘
┌──────────────────┐┌──────────────────┐
│ DataSource1 ││ DataSource2 │
└──────────────────┘└──────────────────┘
TaskEstimator: TaskEstimator:
I want 2 tasks I want 3 tasks
- The distributed planner performs some task count reconciliation in each stage, as there might be multiple nodes contributing an estimated task count for the same stage:
...
┌ ─ ─ ─ ─ ─ ─Same stage
┌──────────────────┐ │
│ │ Repartition │ 3 tasks
└──────────────────┘ │
│ ┌──────────────────┐
│Aggregate(partial)│ 3 tasks │
│ └──────────────────┘
┌──────────────────┐ │
│ │ Filter │ 3 tasks
└──────────────────┘ │
│ ┌──────────────────┐
│ Union │ 3 tasks │
│ └──────────────────┘
┌──────────────────┐┌──────────────────┐ │
││ DataSource1 ││ DataSource2 │
└──────────────────┘└──────────────────┘ │
│ 3 tasks 3 tasks
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
- Network boundaries with specific input task counts are injected, but with empty slots for worker URLs
┏━━━━━━━━━━━━━━━━━━┓
┃ DistributedExec ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkCoalesce ┃
┗━━━━━━━━━━━━━━━━━━┛
┌ ─ ─ ─ ─ ─ ─URL: None ┌ ─ ─ ─ ─ ─ URL: None
┌──────────────────┐│ ┌──────────────────┐│
││ Aggregate(final) │ ││ Aggregate(final) │
└──────────────────┘│ └──────────────────┘│
│┏━━━━━━━━━━━━━━━━━━┓ │┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkShuffle ┃│ ┃ NetworkShuffle ┃│
│┗━━━━━━━━━━━━━━━━━━┛ │┗━━━━━━━━━━━━━━━━━━┛
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None ┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None ┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None
┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐ │
│ │ Repartition │ │ │ Repartition │ │ │ Repartition │
└──────────────────┘ │ └──────────────────┘ │ └──────────────────┘ │
│ ┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐
│Aggregate(partial)│ │ │Aggregate(partial)│ │ │Aggregate(partial)│ │
│ └──────────────────┘ │ └──────────────────┘ │ └──────────────────┘
┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐ │
│ │ Filter │ │ │ Filter │ │ │ Filter │
└──────────────────┘ │ └──────────────────┘ │ └──────────────────┘ │
│ ┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐
│ Union │ │ │ Union │ │ │ Union │ │
│ └──────────────────┘ │ └──────────────────┘ │ └──────────────────┘
┌───────────┐┌───────────┐│ ┌───────────┐┌───────────┐│ ┌───────────┐┌───────────┐│
││ModDataSrc1││ModDataSrc2│ ││ModDataSrc1││ModDataSrc2│ ││ModDataSrc1││ModDataSrc2│
└───────────┘└───────────┘│ └───────────┘└───────────┘│ └───────────┘└───────────┘│
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
- The
DistributedExec head node lazily modifies the nodes below, assigning worker URLs to the different tasks right before execution:
┏━━━━━━━━━━━━━━━━━━┓
┃ DistributedExec ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkCoalesce ┃
┗━━━━━━━━━━━━━━━━━━┛
┌ ─ ─ ─ URL: http://4 ┌ ─ ─ ─ URL: http://5
┌──────────────────┐│ ┌──────────────────┐│
││ Aggregate(final) │ ││ Aggregate(final) │
└──────────────────┘│ └──────────────────┘│
│┏━━━━━━━━━━━━━━━━━━┓ │┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkShuffle ┃│ ┃ NetworkShuffle ┃│
│┗━━━━━━━━━━━━━━━━━━┛ │┗━━━━━━━━━━━━━━━━━━┛
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
┌ ─ ─ ─ ─ ─ ─ URL: http://1 ┌ ─ ─ ─ ─ ─ ─ URL: http://2 ┌ ─ ─ ─ ─ ─ ─ URL: http://3
┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐ │
│ │ Repartition │ │ │ Repartition │ │ │ Repartition │
└──────────────────┘ │ └──────────────────┘ │ └──────────────────┘ │
│ ┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐
│Aggregate(partial)│ │ │Aggregate(partial)│ │ │Aggregate(partial)│ │
│ └──────────────────┘ │ └──────────────────┘ │ └──────────────────┘
┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐ │
│ │ Filter │ │ │ Filter │ │ │ Filter │
└──────────────────┘ │ └──────────────────┘ │ └──────────────────┘ │
│ ┌──────────────────┐ │ ┌──────────────────┐ │ ┌──────────────────┐
│ Union │ │ │ Union │ │ │ Union │ │
│ └──────────────────┘ │ └──────────────────┘ │ └──────────────────┘
┌───────────┐┌───────────┐│ ┌───────────┐┌───────────┐│ ┌───────────┐┌───────────┐│
││ModDataSrc1││ModDataSrc2│ ││ModDataSrc1││ModDataSrc2│ ││ModDataSrc1││ModDataSrc2│
└───────────┘└───────────┘│ └───────────┘└───────────┘│ └───────────┘└───────────┘│
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
How it could work in the future
That last step is random, but at the moment it happens, there's a lot of information available for the users to decide what's an appropriate worker URL for executing that task:
- Task index and task count (what the project calls
DistributedTaskContext)
- The subplan that is assigned to that task
- The overall query plan
This should be sufficient information for users to assign themselves a specific worker URL to each task.
Note that #374 is also proposing an extension to the API for allowing providing more things rather than just a number (the task count estimation), so any future evolution of this API will need to take into account the two efforts:
Today, worker URL assignation is random, and users have no say over which URL is assigned to each worker.
How it works today
The process since there's a valid single-node plan until there's a distributed plan with worker URLs assigned is the following:
DistributedExechead node lazily modifies the nodes below, assigning worker URLs to the different tasks right before execution:How it could work in the future
That last step is random, but at the moment it happens, there's a lot of information available for the users to decide what's an appropriate worker URL for executing that task:
DistributedTaskContext)This should be sufficient information for users to assign themselves a specific worker URL to each task.
Note that #374 is also proposing an extension to the API for allowing providing more things rather than just a number (the task count estimation), so any future evolution of this API will need to take into account the two efforts: