OpenMP and oneTBB¶
Part of Phase 1 section 4 — C++ and Parallel Computing.
Goal: Shared-memory CPU parallelism with OpenMP (directive-based) and oneTBB (task-based algorithms and flow graphs) so structured parallel patterns feel familiar before CUDA.
1. Baseline: std::thread and Synchronization¶
Before frameworks, understand what they abstract over:
#include <thread>
#include <mutex>
#include <atomic>
// Raw thread
std::thread t([]{ /* work */ });
t.join();
// Mutex for shared state
std::mutex mtx;
std::lock_guard<std::mutex> lock(mtx); // RAII, released on scope exit
// Atomic for simple counters (no mutex needed)
std::atomic<int> counter{0};
counter.fetch_add(1, std::memory_order_relaxed);
Key concepts:
- Data races — two threads access the same memory, at least one writes, no synchronization → undefined behavior. Not a crash, just undefined.
- Lock granularity — coarse locks are safe but serialize; fine locks are fast but deadlock-prone.
- Atomics — cheaper than mutexes for single-variable shared state. Same concept as CUDA's atomicAdd.
- Profiling first — find hotspots with perf stat or VTune before parallelizing. The bottleneck is rarely the obvious loop.
OpenMP and oneTBB both build on these primitives. Understanding std::thread helps debug race conditions in any framework.
2. OpenMP¶
2.0 Mental Model: Fork-Join¶
OpenMP uses the fork-join model. The program starts with one thread (the master). When it hits a parallel region, it forks into a team of threads. When the region ends, all threads join back into one.
main thread ────────────────────────────────────────────────►
│ │
┌───────┴──────────────────────────┐ │
│ #pragma omp parallel │ │
│ │ │
thread 0 ───┤── work ── work ── work ────────── ┤───┤
thread 1 ───┤── work ── work ── work ────────── ┤───┤
thread 2 ───┤── work ── work ── work ────────── ┤───┤
thread 3 ───┤── work ── work ── work ────────── ┤───┤
│ │ │
└─────────────── implicit barrier ──┘ │
│
main thread continues ───────────────────────────────►
The compiler does this for you: #pragma omp parallel for is essentially a loop split + thread pool dispatch + barrier, all generated automatically.
Compile: g++ -fopenmp -O2 my_code.cpp
2.1 Your First Parallel Loop¶
#include <omp.h>
#include <vector>
int N = 1'000'000;
std::vector<float> a(N), b(N), c(N);
// Sequential: thread 0 does all N iterations
for (int i = 0; i < N; i++)
c[i] = a[i] + b[i];
// Parallel: 8 threads each do ~125,000 iterations
#pragma omp parallel for
for (int i = 0; i < N; i++)
c[i] = a[i] + b[i];
That's it. One line added. The compiler splits [0, N) into chunks, assigns each chunk to a thread, and inserts a barrier at the end.
What each thread sees:
Thread 0: i = 0 … 124,999
Thread 1: i = 125,000 … 249,999
Thread 2: i = 250,000 … 374,999
...
Thread 7: i = 875,000 … 999,999
This is safe here because each i writes to a different c[i]. No two threads touch the same memory.
2.2 The Classic Race Condition¶
Wrong — data race:
int sum = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++)
sum += a[i]; // ← RACE: multiple threads read-modify-write sum simultaneously
// sum is wrong. Possibly different on every run.
Why it breaks:
Thread 0 reads sum = 5
Thread 1 reads sum = 5 ← same value, before thread 0 wrote back
Thread 0 writes sum = 5 + a[0] = 7
Thread 1 writes sum = 5 + a[1] = 6 ← overwrites thread 0's result!
One update is lost. This is a classic read-modify-write race.
Fix: reduction clause
int sum = 0;
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; i++)
sum += a[i]; // each thread accumulates its own private sum
// all private sums are added together at the end
Each thread gets its own private copy of sum (initialized to 0). After the loop, OpenMP adds all private copies into the original sum. No race.
2.3 Data Sharing Clauses¶
Every variable referenced inside a parallel region is either shared (one copy, all threads see it) or private (each thread has its own copy). OpenMP's default: variables declared outside the region are shared.
int x = 10; // shared by default (dangerous if written!)
int result = 0; // shared — we want to write it safely with reduction
#pragma omp parallel for \
shared(a, b, c, N) \ // explicit: shared across all threads
private(x) \ // each thread gets its own x (uninitialized)
firstprivate(x) \ // each thread gets a copy of x initialized to 10
reduction(+:result)
for (int i = 0; i < N; i++) {
x = compute(i); // safe: each thread has its own x
result += a[i] * b[i];
}
| Clause | Meaning |
|---|---|
shared(var) |
All threads share one copy — reads OK, writes need sync |
private(var) |
Each thread gets its own copy, uninitialized |
firstprivate(var) |
Each thread gets its own copy, initialized from original |
lastprivate(var) |
Private during loop, value from last iteration copied back after |
reduction(op:var) |
Private accumulator per thread, combined with op at end |
Rule of thumb: If all threads only read a variable →
shared. If each thread writes it independently →private. If they accumulate into it →reduction.
2.4 Schedules — How Work Is Divided¶
// Static (default): divide evenly before the loop starts
// Thread 0 gets [0, N/P), thread 1 gets [N/P, 2N/P), ...
// Best when: each iteration costs the same amount of time
#pragma omp parallel for schedule(static)
for (int i = 0; i < N; i++) { /* uniform work */ }
// Static with chunk: interleave chunks of k iterations
// Thread 0 gets 0-7, 32-39, 64-71, ... (chunk=8, 4 threads)
// Helps with cache locality in some patterns
#pragma omp parallel for schedule(static, 8)
for (int i = 0; i < N; i++) { /* */ }
// Dynamic: each thread takes the next k iterations when it becomes free
// Overhead: ~synchronization per chunk fetch
// Best when: iterations have unpredictable/varying cost
#pragma omp parallel for schedule(dynamic, 64)
for (int i = 0; i < N; i++) { /* variable-cost work */ }
// Guided: starts with large chunks, shrinks over time
// Reduces scheduling overhead while handling tail imbalance
// Best when: later iterations are lighter than earlier ones
#pragma omp parallel for schedule(guided)
for (int i = 0; i < N; i++) { /* */ }
// Runtime: schedule determined by OMP_SCHEDULE env variable
// OMP_SCHEDULE="dynamic,32" ./my_program
#pragma omp parallel for schedule(runtime)
for (int i = 0; i < N; i++) { /* */ }
When to pick which:
All iterations take the same time? → static (lowest overhead)
Iterations have wildly different cost? → dynamic
Don't know, want adaptive? → guided
Tuning at runtime without recompile? → runtime
2.5 Reductions¶
Supported operators out of the box:
int sum = 0, product = 1, max_val = INT_MIN;
#pragma omp parallel for reduction(+:sum) reduction(*:product) reduction(max:max_val)
for (int i = 0; i < N; i++) {
sum += a[i];
product *= b[i];
max_val = std::max(max_val, a[i]);
}
Built-in operators: +, *, -, &, |, ^, &&, ||, min, max.
Custom reduction (C++ only, OpenMP 4.0+):
struct Vec3 { float x, y, z; };
#pragma omp declare reduction(vec_add : Vec3 : \
omp_out.x += omp_in.x; \
omp_out.y += omp_in.y; \
omp_out.z += omp_in.z) \
initializer(omp_priv = Vec3{0,0,0})
Vec3 total{0,0,0};
#pragma omp parallel for reduction(vec_add:total)
for (int i = 0; i < N; i++)
total += forces[i];
2.6 Collapse — Parallelizing Nested Loops¶
// Without collapse: only the outer loop is parallelized
// If outer loop has fewer iterations than threads → wasted threads
#pragma omp parallel for
for (int i = 0; i < 4; i++)
for (int j = 0; j < 1000; j++)
A[i][j] = B[i][j] * C[i][j];
// With collapse(2): outer * inner = 4000 iterations are parallelized
// Each thread gets a portion of the flattened 4000-iteration space
#pragma omp parallel for collapse(2)
for (int i = 0; i < 4; i++)
for (int j = 0; j < 1000; j++)
A[i][j] = B[i][j] * C[i][j];
Use
collapsewhen the outer loop count is small relative to the thread count.
2.7 Critical Sections and Atomics¶
When reduction is not enough (complex shared state):
// critical: only one thread at a time
// Correct, but slow (acts like a mutex around the block)
#pragma omp parallel for
for (int i = 0; i < N; i++) {
#pragma omp critical
{
global_map[key(i)] += value(i);
}
}
// atomic: faster for single memory operations (uses hardware atomics)
int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
#pragma omp atomic
counter++; // ← hardware atomic, no mutex overhead
}
// atomic with operation
#pragma omp atomic update
total += a[i];
#pragma omp atomic read
int val = shared_var;
#pragma omp atomic write
shared_var = computed;
Critical vs Atomic:
- #pragma omp critical — mutex, any code block, higher overhead
- #pragma omp atomic — hardware instruction, single read/write/update only, much faster
2.8 Barrier and Nowait¶
#pragma omp parallel
{
// Threads do independent work
do_phase_one(omp_get_thread_num());
// Implicit barrier at end of 'parallel for' — all threads wait here
// by default before continuing
// 'nowait' removes the barrier — threads proceed immediately when done
#pragma omp for nowait
for (int i = 0; i < N; i++)
a[i] = compute(i);
// threads that finish early do NOT wait — they proceed here
// only safe if subsequent code doesn't depend on all threads finishing
// Explicit barrier — synchronize all threads at a specific point
#pragma omp barrier
// all threads are here now
}
2.9 Tasks — Irregular Parallelism¶
Tasks are for work that doesn't fit a regular loop: recursive algorithms, tree traversal, linked lists.
// Recursive parallel tree sum using tasks
int sum_tree(Node* node) {
if (!node) return 0;
int left_sum, right_sum;
#pragma omp parallel // create thread team
#pragma omp single // only ONE thread creates tasks (others wait for tasks)
{
#pragma omp task shared(left_sum)
left_sum = sum_tree(node->left);
#pragma omp task shared(right_sum)
right_sum = sum_tree(node->right);
#pragma omp taskwait // wait for both tasks before using results
}
return node->value + left_sum + right_sum;
}
Fibonacci (classic task example):
int fib(int n) {
if (n < 2) return n;
int x, y;
#pragma omp task shared(x) firstprivate(n)
x = fib(n - 1);
#pragma omp task shared(y) firstprivate(n)
y = fib(n - 2);
#pragma omp taskwait
return x + y;
}
// Call from inside a parallel region:
int result;
#pragma omp parallel
#pragma omp single
result = fib(30);
singleis critical: without it, every thread would create tasks, creating an explosion.singleensures only one thread spawns the top-level tasks; the rest of the team executes them.
2.10 Sections — Different Code on Different Threads¶
#pragma omp parallel sections
{
#pragma omp section
{
printf("Thread %d: loading data\n", omp_get_thread_num());
load_data();
}
#pragma omp section
{
printf("Thread %d: initializing config\n", omp_get_thread_num());
init_config();
}
#pragma omp section
{
printf("Thread %d: warming up cache\n", omp_get_thread_num());
warmup();
}
}
// All three run in parallel, all done here
Use sections for a fixed, small number of different concurrent operations. For dynamic work, use tasks.
2.11 SIMD Pragma¶
// Ask the compiler to vectorize (SIMD) one loop on a single thread
#pragma omp simd
for (int i = 0; i < N; i++)
c[i] = a[i] * b[i] + d[i];
// Parallelize across threads AND vectorize within each thread's chunk
#pragma omp parallel for simd
for (int i = 0; i < N; i++)
c[i] = a[i] * b[i];
// simd with reduction (e.g. sum with SIMD accumulation)
float sum = 0.0f;
#pragma omp simd reduction(+:sum)
for (int i = 0; i < N; i++)
sum += a[i];
The simd pragma is a hint. The compiler still decides if vectorization is safe. Add -fopt-info-vec to see what was vectorized.
2.12 Thread Info and Environment¶
// Query runtime info
int n_threads = omp_get_num_threads(); // inside parallel region
int thread_id = omp_get_thread_num(); // 0 to n_threads-1
int max_threads = omp_get_max_threads(); // outside parallel region
int n_procs = omp_get_num_procs(); // hardware thread count
// Set thread count
omp_set_num_threads(8);
// Timing
double t0 = omp_get_wtime();
// ... work ...
double elapsed = omp_get_wtime() - t0;
Environment variables (set before running):
OMP_NUM_THREADS=8 # number of threads
OMP_SCHEDULE="dynamic,64" # schedule for 'runtime' clauses
OMP_PROC_BIND=close # bind threads to nearby hardware (NUMA)
OMP_PLACES=cores # binding granularity: threads, cores, sockets
GOMP_SPINCOUNT=100000 # how long to spin before sleeping (GNU)
2.13 Nested Parallelism¶
omp_set_nested(1); // enable nested parallel regions
void outer_task() {
#pragma omp parallel for num_threads(4)
for (int i = 0; i < 4; i++) {
// Inner parallel region: each of 4 threads spawns 2 more
#pragma omp parallel for num_threads(2)
for (int j = 0; j < N; j++)
compute(i, j);
}
}
Nested parallelism often over-subscribes cores. Usually better to
collapse(2)instead.
2.14 Common Pitfalls¶
| Pitfall | Symptom | Fix |
|---|---|---|
| Shared write without sync | Wrong results, non-deterministic | reduction, atomic, or critical |
Capturing i by reference in tasks |
Task reads wrong i |
firstprivate(i) on the task |
single missing in task-creation code |
Task explosion | Add #pragma omp single around task creation |
Calling omp_get_num_threads() outside parallel |
Returns 1 always | Call inside the parallel region |
Missing taskwait before using task results |
Use before ready | #pragma omp taskwait |
nowait on dependent loops |
Uses wrong data | Remove nowait or add explicit barrier |
| Long critical sections | Serializes threads | Narrow the critical section, use atomic for simple ops |
2.15 OpenMP vs oneTBB¶
| OpenMP | oneTBB | |
|---|---|---|
| API style | Compiler directives (#pragma) |
C++ templates and lambdas |
| Learning curve | Lower — one pragma per feature | Higher — need to know template types |
| Granularity | Loop-level | Task-level |
| Load balancing | Static/dynamic/guided schedules | Work-stealing (automatic, adaptive) |
| Flow graphs | No | Yes (flow_graph) |
| Per-thread storage | threadprivate / private clause |
enumerable_thread_specific |
| Nested parallelism | Manual | Built-in |
| SIMD hints | #pragma omp simd |
Must use manual intrinsics |
| Best for | Existing loops, Fortran interop, scientific HPC | New C++ code, complex graphs, irregular tasks |
Resources: OpenMP specifications · OpenMP API Reference Card (PDF)
3. oneTBB (oneAPI Threading Building Blocks)¶
3.0 Mental Model: Work-Stealing Scheduler¶
oneTBB is a task-based parallel programming library. Instead of managing threads directly, you express what can run in parallel — the runtime distributes work using a work-stealing scheduler.
How work-stealing works:
Thread 0's deque: [task A] [task B] [task C] ← pushes/pops from right (LIFO, cache-friendly)
Thread 1's deque: [] ← empty, idle
Thread 2's deque: [task X]
Thread 3's deque: [] ← empty, idle
Step 1: Thread 0 pops task C from its own deque (right end, hot in cache)
Step 2: Thread 1 is idle → STEALS task A from Thread 0's LEFT end
Step 3: Thread 3 is idle → STEALS task X from Thread 2
Why steal from the left? - Owner pops from the right (LIFO) — processes its own tasks in stack order, cache-friendly - Stealer pops from the left (FIFO) — takes the biggest, oldest tasks, maximizing stolen work size
Result: Threads never sit idle as long as any thread has work. No need to manually tune thread counts for imbalanced workloads.
Install:
# Ubuntu/Debian
sudo apt install libtbb-dev
# CMake integration
find_package(TBB REQUIRED)
target_link_libraries(my_target TBB::tbb)
Header:
3.1 parallel_for — Parallel Loop¶
The fundamental building block. Splits a range into chunks and executes each chunk on an available thread.
Simple 1D integer form:
int N = 1'000'000;
float a[N], b[N], c[N];
// Compact form — runtime picks chunk size automatically
parallel_for(0, N, [&](int i) {
c[i] = a[i] + b[i];
});
blocked_range form — gives you the subrange, loop inside:
// Better: lets you write inner loops with fewer lambda calls
parallel_for(blocked_range<int>(0, N),
[&](const blocked_range<int>& r) {
for (int i = r.begin(); i < r.end(); i++) {
c[i] = a[i] + b[i];
}
}
);
blocked_range<T>(begin, end) is the half-open interval [begin, end). The lambda receives a subrange r — call .begin() and .end() to iterate it.
Why blocked_range over compact form?
- Avoids per-element lambda call overhead
- Lets you initialize buffers once per chunk (instead of per element)
- Required for custom chunk-level logic (SIMD, temp allocation)
2D range — matrix or image processing:
parallel_for(blocked_range2d<int>(0, rows, 0, cols),
[&](const blocked_range2d<int>& r) {
for (int i = r.rows().begin(); i < r.rows().end(); i++)
for (int j = r.cols().begin(); j < r.cols().end(); j++)
out[i][j] = process(in[i][j]);
}
);
3D range:
parallel_for(blocked_range3d<int>(0, D, 0, H, 0, W),
[&](const blocked_range3d<int>& r) {
for (int d = r.pages().begin(); d < r.pages().end(); d++)
for (int h = r.rows().begin(); h < r.rows().end(); h++)
for (int w = r.cols().begin(); w < r.cols().end(); w++)
vol[d][h][w] = f(d, h, w);
}
);
Partitioners — Control Chunking¶
The third optional argument controls how work is divided:
// auto_partitioner (default): runtime tunes chunk size automatically
parallel_for(blocked_range<int>(0, N), body);
// affinity_partitioner: reuses same data → same thread (cache-warm)
// Declare static so it persists between calls
static affinity_partitioner ap;
parallel_for(blocked_range<int>(0, N), body, ap);
// simple_partitioner: chunk = grainsize exactly, no dynamic balancing
parallel_for(blocked_range<int>(0, N, 1000), body, simple_partitioner());
// static_partitioner: divide evenly upfront, no stealing
// Deterministic: same thread always gets same range
parallel_for(blocked_range<int>(0, N), body, static_partitioner());
| Partitioner | Chunk size | Load balancing | Use when |
|---|---|---|---|
auto_partitioner |
Adaptive | Work-stealing (default) | Most cases |
affinity_partitioner |
Adaptive | Dynamic + cache-aware | Re-running same loop over same data repeatedly |
simple_partitioner |
= grainsize | None | Each chunk needs temp buffer, or N is huge |
static_partitioner |
Uniform | None | Need reproducible execution for debugging |
Grainsize tuning: Each chunk should take ≥ ~100,000 clock cycles (~50 µs at 2 GHz). Too-small chunks = scheduling overhead dominates. Rule: if loop body takes 10 ns, grainsize of 10,000+ is reasonable.
3.2 parallel_reduce — Parallel Reduction¶
For loops that accumulate a result: sum, min, max, dot product, histogram.
Lambda form (most common):
// Sum of array
float total = parallel_reduce(
blocked_range<int>(0, N),
0.0f, // identity value
[&](const blocked_range<int>& r, float init) -> float {
for (int i = r.begin(); i < r.end(); i++)
init += a[i];
return init;
},
[](float x, float y) -> float { // combine partial results
return x + y;
}
);
Arguments: 1. Range to iterate 2. Identity value (initial value for each partial result) 3. Body: takes a subrange + running partial → returns new partial 4. Combine: merges two partial results into one
Dot product:
float dot = parallel_reduce(
blocked_range<int>(0, N), 0.0f,
[&](const blocked_range<int>& r, float init) -> float {
for (int i = r.begin(); i < r.end(); i++)
init += a[i] * b[i];
return init;
},
std::plus<float>{}
);
Find minimum with index:
struct MinResult { float val; int idx; };
auto result = parallel_reduce(
blocked_range<int>(0, N),
MinResult{FLT_MAX, -1},
[&](const blocked_range<int>& r, MinResult curr) {
for (int i = r.begin(); i < r.end(); i++)
if (a[i] < curr.val) curr = {a[i], i};
return curr;
},
[](MinResult a, MinResult b) {
return a.val < b.val ? a : b;
}
);
Common mistake: Do not reset the accumulator to zero inside the body —
initcarries prior partial results. Resetting it discards prior work.
Deterministic reduce — same result every run regardless of scheduling:
float result = parallel_deterministic_reduce(
blocked_range<int>(0, N, 1000), // explicit grainsize required
0.0f,
[&](const blocked_range<int>& r, float init) {
return std::accumulate(&a[r.begin()], &a[r.end()], init);
},
std::plus<float>{}
);
// Floating-point result identical on every run
3.3 parallel_scan — Prefix Sum¶
Parallel prefix scan: each element out[i] = f(out[i-1], in[i]).
#include "oneapi/tbb/parallel_scan.h"
// Parallel inclusive prefix sum
std::vector<float> in(N), out(N);
parallel_scan(
blocked_range<int>(0, N),
0.0f, // initial value
[&](const blocked_range<int>& r, float running, bool is_final) {
for (int i = r.begin(); i < r.end(); i++) {
running += in[i];
if (is_final)
out[i] = running;
}
return running;
},
[](float a, float b) { return a + b; } // combine
);
// out[i] = in[0] + in[1] + ... + in[i]
is_final: The scan body runs twice per chunk — once to compute partial sums (not final), once with the carry-in to fill output. is_final == true means "write to output now".
Use cases: cumulative sums, exclusive scan for compaction, CDF computation.
3.4 parallel_sort¶
#include "oneapi/tbb/parallel_sort.h"
std::vector<int> data(N);
// ...fill data...
// In-place sort (like std::sort but parallel)
parallel_sort(data.begin(), data.end());
// Custom comparator
parallel_sort(data.begin(), data.end(), std::greater<int>{});
// Sort struct by field
parallel_sort(records.begin(), records.end(),
[](const Record& a, const Record& b) {
return a.score > b.score;
}
);
Uses a parallel quicksort variant with work-stealing. Typically 4–6× faster than std::sort on 8+ cores for large N.
Note:
parallel_sortis not stable (equal elements may reorder). Usestd::stable_sortfor stable ordering (no TBB parallel version exists for stable sort).
3.5 parallel_for_each — Unknown Iteration Space¶
For containers without random-access iterators (linked lists, sets) or when the iteration space grows during execution:
std::list<WorkItem> items = get_work();
parallel_for_each(items.begin(), items.end(),
[](WorkItem& item) {
process(item);
}
);
Dynamic work addition with feeder — BFS / tree traversal:
parallel_for_each(roots.begin(), roots.end(),
[](Node* node, feeder<Node*>& f) {
process(node);
for (Node* child : node->children)
f.add(child); // adds work dynamically to the pool
}
);
// Runs until all nodes processed, including dynamically added ones
3.6 parallel_pipeline — Assembly Line¶
Pipeline parallelism: data flows through stages. Parallel stages process multiple items concurrently; serial stages maintain order.
#include "oneapi/tbb/pipeline.h"
const int max_tokens = 16; // max items in-flight simultaneously
parallel_pipeline(max_tokens,
// Stage 1: serial input (reads one item at a time)
make_filter<void, InputData*>(
filter_mode::serial_in_order,
[&](flow_control& fc) -> InputData* {
InputData* data = read_next();
if (!data) { fc.stop(); return nullptr; }
return data;
}
) &
// Stage 2: parallel processing (multiple items at once)
make_filter<InputData*, OutputData*>(
filter_mode::parallel,
[](InputData* in) -> OutputData* {
return transform(in);
}
) &
// Stage 3: serial output (writes in original order)
make_filter<OutputData*, void>(
filter_mode::serial_in_order,
[&](OutputData* out) {
write_result(out);
delete out;
}
)
);
Filter modes:
| Mode | Ordering | Concurrency | Use when |
|---|---|---|---|
serial_in_order |
Preserved | 1 at a time | I/O, ordered output |
serial_out_of_order |
Not preserved | 1 at a time | Single-threaded transform |
parallel |
Not preserved | Multiple | CPU-bound transform, independent items |
max_tokens limits memory: the pipeline never has more than this many items in-flight simultaneously. If processing is fast but output is slow, tokens pile up — limit them to bound memory.
Throughput law: Throughput =
max_tokens / slowest_serial_stage_latency. A slow serial stage caps your throughput regardless of parallel stage speed.
3.7 parallel_invoke and task_group — Explicit Tasks¶
For a fixed number of independent tasks (fork-join):
#include "oneapi/tbb/parallel_invoke.h"
// Run two functions in parallel, wait for both
parallel_invoke(
[]{ sort_left_half(); },
[]{ sort_right_half(); }
);
// Both are done here
// Up to N functions
parallel_invoke(
[]{ task_a(); },
[]{ task_b(); },
[]{ task_c(); },
[]{ task_d(); }
);
For a dynamic number of tasks:
#include "oneapi/tbb/task_group.h"
task_group tg;
for (auto& item : work_units) {
tg.run([&item]{ process(item); });
}
tg.wait(); // blocks until all tasks complete
// task_group::run() is thread-safe — tasks can add more tasks
task_group tg2;
tg2.run([&]{
tg2.run([&]{ subtask_a(); }); // recursive is fine
tg2.run([&]{ subtask_b(); });
});
tg2.wait();
3.8 enumerable_thread_specific — Per-Thread Storage¶
One of the most important oneTBB features for avoiding false sharing and locks. Gives each thread its own copy of a variable.
Problem without it:
// WRONG: all threads write to the same histogram → data race
std::vector<int> hist(256, 0);
parallel_for(0, N, [&](int i) {
hist[image[i]]++; // race condition!
});
Fix with enumerable_thread_specific:
#include "oneapi/tbb/enumerable_thread_specific.h"
// Each thread gets its own private histogram
enumerable_thread_specific<std::vector<int>> local_hist(
[]{ return std::vector<int>(256, 0); } // factory: how to create each copy
);
parallel_for(0, N, [&](int i) {
local_hist.local()[image[i]]++; // .local() returns THIS thread's copy
});
// Combine all per-thread histograms into one
std::vector<int> global_hist(256, 0);
for (auto& h : local_hist) { // iterate over all thread-local copies
for (int k = 0; k < 256; k++)
global_hist[k] += h[k];
}
Mechanics:
- .local() returns a reference to this thread's copy. Creates it on first call.
- Iterating over local_hist gives one copy per thread that called .local().
- Zero lock contention during the parallel phase.
Another example — thread-local sum:
enumerable_thread_specific<float> thread_sum(0.0f);
parallel_for(blocked_range<int>(0, N), [&](const blocked_range<int>& r) {
float& my_sum = thread_sum.local();
for (int i = r.begin(); i < r.end(); i++)
my_sum += a[i];
});
float total = 0.0f;
for (float s : thread_sum) total += s;
Construction options:
// Default-constructed value
enumerable_thread_specific<int> ets1; // each copy = int{}
// Value-initialized
enumerable_thread_specific<int> ets2(42); // each copy = 42
// Factory function (for non-copyable or expensive init)
enumerable_thread_specific<std::vector<int>> ets3(
[]{ return std::vector<int>(1024, 0); }
);
3.9 combinable<T> — Simpler Per-Thread Accumulation¶
A simplified version of enumerable_thread_specific specifically for accumulating a single value:
#include "oneapi/tbb/combinable.h"
combinable<float> partial_sum;
parallel_for(0, N, [&](int i) {
partial_sum.local() += a[i]; // thread-local accumulate
});
// Combine all locals with a binary op
float total = partial_sum.combine([](float a, float b) {
return a + b;
});
vs enumerable_thread_specific:
- combinable<T> — simpler API, designed specifically for reduction patterns
- enumerable_thread_specific<T> — more flexible, supports iteration, factory init, non-trivial types
Use combinable when you just want a thread-local accumulator. Use enumerable_thread_specific when you need to inspect each thread's value separately after the parallel phase.
3.10 Flow Graph — Data Flow and Dependence Graphs¶
For expressing complex parallel patterns as a graph of nodes and edges. The runtime automatically runs nodes when their inputs are ready — no manual synchronization.
#include "oneapi/tbb/flow_graph.h"
using namespace oneapi::tbb::flow;
graph g;
// function_node<In, Out>: receives In, produces Out
// Second arg = max concurrency (1=serial, unlimited=max)
function_node<int, int> square(g, unlimited, [](int x) { return x * x; });
function_node<int, int> cube (g, unlimited, [](int x) { return x * x * x; });
// join_node: wait for one input per port, emit as tuple
join_node<std::tuple<int,int>> join(g);
function_node<std::tuple<int,int>, void> printer(g, 1,
[](const std::tuple<int,int>& t) {
printf("square=%d cube=%d\n", std::get<0>(t), std::get<1>(t));
}
);
// Wire the graph
make_edge(square, input_port<0>(join));
make_edge(cube, input_port<1>(join));
make_edge(join, printer);
// Send inputs — both nodes run in parallel
square.try_put(5);
cube.try_put(5);
g.wait_for_all(); // always wait before graph goes out of scope
All Key Node Types¶
| Node | Description |
|---|---|
function_node<In, Out> |
Transforms In → Out, configurable concurrency |
source_node<Out> (deprecated)input_node<Out> |
Generates tokens from a function; starts the graph |
broadcast_node<T> |
Fans out: sends one input to ALL successors |
join_node<tuple<...>> |
Fans in: waits for one from each port, emits tuple |
split_node<tuple<...>> |
Splits tuple: sends element N to port N |
buffer_node<T> |
Buffers messages until a successor requests them |
queue_node<T> |
FIFO buffer; passes messages to any available successor |
priority_queue_node<T> |
Priority-ordered buffer |
sequencer_node<T> |
Reorders out-of-order items back to sequence number order |
limiter_node<T> |
Limits in-flight messages (backpressure) |
overwrite_node<T> |
Stores last value; new successors get it immediately |
write_once_node<T> |
Stores first value; broadcasts to all registered receivers |
indexer_node<T...> |
Tagged union input: accepts any type from set, tags messages |
Fan-Out with broadcast_node¶
broadcast_node<int> broadcaster(g);
function_node<int, void> worker_a(g, unlimited, [](int x){ do_a(x); });
function_node<int, void> worker_b(g, unlimited, [](int x){ do_b(x); });
make_edge(broadcaster, worker_a);
make_edge(broadcaster, worker_b); // same input goes to both
broadcaster.try_put(42); // both worker_a and worker_b receive 42
g.wait_for_all();
Reordering with sequencer_node¶
// parallel_for may complete items out of order — use sequencer to restore order
sequencer_node<Frame> reorder(g, [](const Frame& f) {
return f.sequence_number; // sequencer uses this to order outputs
});
function_node<Frame, Frame> processor(g, unlimited, [](Frame f) {
f.data = process(f.data); // runs in parallel, out of order
return f;
});
function_node<Frame, void> writer(g, serial, [](const Frame& f) {
write_in_order(f); // receives frames in sequence order
});
make_edge(processor, reorder);
make_edge(reorder, writer);
Backpressure with limiter_node¶
// Prevent fast producer from overwhelming slow consumer
limiter_node<int> limiter(g, 8); // max 8 messages in flight
make_edge(producer, limiter);
make_edge(limiter, slow_consumer);
make_edge(slow_consumer, limiter.decrement); // signal when done → unlocks limiter
Conditional Routing with indexer_node¶
using IndexerType = indexer_node<int, float>; // accepts int OR float
IndexerType idx(g);
function_node<IndexerType::output_type, void> router(g, unlimited,
[](const IndexerType::output_type& msg) {
if (msg.tag() == 0) // int arrived
handle_int(cast_to<int>(msg));
else // float arrived
handle_float(cast_to<float>(msg));
}
);
make_edge(idx, router);
input_port<0>(idx).try_put(42); // send int
input_port<1>(idx).try_put(3.14f); // send float
Always call
g.wait_for_all()before the graph object is destroyed. Destroying a live graph is undefined behavior.
3.11 Concurrent Containers¶
Thread-safe containers for parallel access — no external locking needed.
concurrent_hash_map¶
#include "oneapi/tbb/concurrent_hash_map.h"
concurrent_hash_map<std::string, int> freq;
parallel_for_each(words.begin(), words.end(), [&](const std::string& w) {
concurrent_hash_map<std::string, int>::accessor acc; // write lock
freq.insert(acc, w); // acc locks this bucket
acc->second++; // safe: bucket locked by accessor
}); // acc destructor releases the lock
// Read-only access: const_accessor (shared lock, multiple readers OK)
concurrent_hash_map<std::string, int>::const_accessor cacc;
if (freq.find(cacc, "hello"))
printf("hello appears %d times\n", cacc->second);
accessor (write lock) and const_accessor (read lock) give per-bucket locking — much finer granularity than a single std::mutex over a std::unordered_map.
concurrent_vector¶
Safe for concurrent push_back and element access. Iterators and references remain valid after growth.
#include "oneapi/tbb/concurrent_vector.h"
concurrent_vector<Result> results;
parallel_for(0, N, [&](int i) {
if (passes_filter(i))
results.push_back(compute(i)); // thread-safe
});
// results has all passing items (order not guaranteed)
concurrent_queue / concurrent_bounded_queue¶
#include "oneapi/tbb/concurrent_queue.h"
concurrent_queue<WorkItem> queue;
// Producer thread (can be called from multiple threads)
queue.push(item);
// Consumer thread (non-blocking)
WorkItem item;
if (queue.try_pop(item)) {
process(item);
}
// Bounded queue — push blocks when full (backpressure)
concurrent_bounded_queue<WorkItem> bounded(100); // max 100 items
bounded.push(item); // blocks if full
bounded.pop(item); // blocks if empty
concurrent_priority_queue¶
#include "oneapi/tbb/concurrent_priority_queue.h"
concurrent_priority_queue<int> pq;
pq.push(10);
pq.push(5);
pq.push(20);
int top;
pq.try_pop(top); // top = 20 (max by default)
3.12 Scalable Memory Allocator¶
Standard malloc/free have a single global lock — a bottleneck when many threads allocate simultaneously. TBB's allocator uses per-thread memory pools:
#include "oneapi/tbb/scalable_allocator.h"
// Use tbb_allocator with STL containers
std::vector<float, tbb::tbb_allocator<float>> vec(N);
// Use scalable_malloc directly (drop-in for malloc)
float* buf = (float*)scalable_malloc(N * sizeof(float));
scalable_free(buf);
// Replace all allocations globally (link with -ltbbmalloc_proxy)
// Or: set LD_PRELOAD=libtbbmalloc_proxy.so before running
Most impactful when many threads frequently allocate/free small objects (task objects, intermediate buffers in a pipeline).
3.13 task_arena — Control Thread Pool¶
task_arena lets you isolate work into a dedicated thread pool, control thread count, and bind to specific NUMA nodes.
#include "oneapi/tbb/task_arena.h"
// Create a pool of 4 threads (instead of using the global pool)
task_arena arena(4);
arena.execute([&]{
parallel_for(0, N, [&](int i) {
c[i] = a[i] + b[i];
});
});
// parallel_for ran on at most 4 threads inside arena
// NUMA-aware: pin arena to socket 1
task_arena numa_arena(task_arena::constraints{}
.set_numa_id(1) // socket 1
.set_max_concurrency(8) // 8 threads
);
numa_arena.execute([&]{
parallel_for(0, N, body);
});
Isolation: Work inside arena.execute() does not mix with work in the global arena. This is important when you have multiple independent parallel subsystems that shouldn't steal from each other.
// Two independent subsystems with separate thread pools
task_arena system_a(4), system_b(4);
// These run in their own isolated pools — no work stealing between them
std::thread t1([&]{ system_a.execute([&]{ parallel_for(0, N, body_a); }); });
std::thread t2([&]{ system_b.execute([&]{ parallel_for(0, N, body_b); }); });
t1.join(); t2.join();
3.14 global_control — Runtime Configuration¶
#include "oneapi/tbb/global_control.h"
// Limit maximum number of worker threads globally
global_control ctrl(
global_control::max_allowed_parallelism, 4
);
// All TBB algorithms now use at most 4 threads
// Restored when ctrl goes out of scope (RAII)
// Stack size for worker threads
global_control stack_ctrl(
global_control::thread_stack_size, 8 * 1024 * 1024 // 8 MB
);
3.15 Exception Handling and Cancellation¶
// Exceptions propagate from worker threads to the calling thread
try {
parallel_for(0, N, [](int i) {
if (bad_condition(i))
throw std::runtime_error("bad input");
});
} catch (const std::exception& e) {
// Caught here — remaining iterations are cancelled
std::cerr << e.what() << "\n";
}
// Explicit cancellation via task_group_context
task_group_context ctx;
parallel_for(blocked_range<int>(0, N), body, auto_partitioner(), ctx);
// From another thread:
ctx.cancel_group_execution(); // remaining tasks will not start
3.16 Work Isolation¶
By default, a thread waiting on an inner parallel_for may execute tasks from the outer scope. Use this_task_arena::isolate to prevent this:
this_task_arena::isolate([&]{
parallel_for(0, N, [](int i) { /* inner work */ });
});
// Inner loop uses only threads from the current arena,
// not tasks from outer parallel regions
Use when inner and outer work share data structures that can't be accessed concurrently.
3.17 Design Patterns Summary¶
Reduce:
Divide and Conquer (recursive):
void parallel_mergesort(int* a, int n) {
if (n < THRESHOLD) { std::sort(a, a + n); return; }
parallel_invoke(
[&]{ parallel_mergesort(a, n/2); },
[&]{ parallel_mergesort(a + n/2, n - n/2); }
);
std::inplace_merge(a, a + n/2, a + n);
}
Map (elementwise):
parallel_for(blocked_range<int>(0, N), [&](const blocked_range<int>& r) {
for (int i = r.begin(); i < r.end(); i++)
out[i] = f(in[i]);
});
Histogram (per-thread local + merge):
enumerable_thread_specific<std::vector<int>> local_hist(
[]{ return std::vector<int>(256, 0); });
parallel_for(0, N, [&](int i) {
local_hist.local()[data[i]]++;
});
std::vector<int> hist(256, 0);
for (auto& h : local_hist)
for (int k = 0; k < 256; k++) hist[k] += h[k];
Prefix scan:
Assembly line:
3.18 Connection to GPU Programming¶
oneTBB patterns map directly to GPU frameworks:
| oneTBB concept | GPU equivalent |
|---|---|
parallel_for over blocked_range |
CUDA kernel launch over thread grid |
| Work-stealing scheduler | Warp scheduler (hardware) |
affinity_partitioner |
L2 cache locality hints in CUDA |
enumerable_thread_specific |
Per-warp shared memory allocation |
combinable<T> |
atomicAdd into shared mem, then reduce |
concurrent_queue |
CUDA stream (async task queue) |
parallel_pipeline |
CUDA multi-stream pipeline |
flow_graph nodes |
CUDA graph nodes (cudaGraph) |
task_group |
CUDA dynamic parallelism |
task_arena |
CUDA stream priority + stream isolation |
| Scalable allocator | cudaMallocAsync (stream-ordered pool) |
Learning oneTBB's task decomposition makes CUDA's thread/block/grid hierarchy intuitive — they solve the same problem at different scales.
Resources¶
| Resource | What it covers |
|---|---|
| oneTBB Developer Guide | Full tutorial: parallel_for, reduce, pipeline, flow graph, design patterns |
| oneTBB API Reference | Complete API |
| Intel oneTBB Get Started | Installation, CMake, first steps |
| OpenMP specifications | Official OpenMP standard |
| OpenMP API Quick Reference Card | All clauses at a glance |
| Intel Threading Building Blocks (Reinders) | Book: TBB patterns in depth |
Next¶
→ CUDA and SIMT — GPU parallelism: threads, warps, blocks, and memory hierarchy.