Skip to main content

Tune Throughput and Queue Depth

Tune Throughput and Queue Depth — animated walkthrough overview

FieldValue
DifficultyAdvanced
Estimated Read Time15-20 minutes
Labelsperformance, tuning, async, queues

Performance tuning only helps once your correctness baseline is stable; this chapter assumes it is, and turns to the knobs that decide how an async pipeline behaves when work arrives faster than it can be processed. You will set the queue depth, choose what happens when that queue fills, push a deterministic burst of frames non-blockingly, drain the results, and read the measurement report that tells you whether you dropped anything and how long each frame took.

By the end you will have a working harness for measuring an async run under backpressure: enqueue counts, drop counts, outputs pulled, average latency, and push cost. The same loop is the basis for tuning a real pipeline against the heuristics in In Practice.

Walkthrough

Configure the run options

RunOptions is where async behavior under load is decided. We set queue_depth (how many in-flight samples the runtime accepts), overflow_policy (what happens when that queue is full — Block, KeepLatest, or DropIncoming), output_memory = Owned (returned tensors own their data so they survive past the pull), We then build() the graph in Async mode, which gives us a run with independent producer and consumer sides.

The overflow policy is parsed from --drop into simaai::neat::OverflowPolicy::{Block,KeepLatest,DropIncoming}; graph.build(input, opt) returns the run handle.

tutorials/016_tune_throughput_and_queues/tune_throughput_and_queues.cpp
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;

auto run = graph.build(std::vector<cv::Mat>{rgb}, opt);

Push the workload and drain

This is where the queue policy is exercised. We call try_push(...) in a tight loop — a non-blocking push that simply returns whether the sample was accepted, so a full queue under DropIncoming/KeepLatest shows up as rejected pushes rather than a stall. After the burst we call close_input() to signal no more inputs, then drain the consumer side with a pull(...) loop until it returns empty. Pairing try_push with close_input plus a drain loop is the canonical non-blocking async pattern.

tutorials/016_tune_throughput_and_queues/tune_throughput_and_queues.cpp
// try_push never blocks; pair it with close_input + drain pull loop.
simaai::neat::MeasureOptions measure_opt;
measure_opt.title = "tutorial 016 throughput";
auto scope = run.start_measurement(measure_opt);
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();

int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto measured = scope.stop();

Read the measurement report

With the run drained, we stop the measurement scope. The report counters group gives runtime-side numbers — inputs enqueued, inputs dropped, outputs pulled — while input gives push-side numbers such as average push cost and input renegotiations. Together, these tell you whether your queue depth and overflow policy did what you intended: did frames drop, did latency climb, was the push path cheap.

tutorials/016_tune_throughput_and_queues/tune_throughput_and_queues.cpp
std::cout << "inputs_enqueued=" << measured.counters.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << measured.counters.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << measured.end_to_end.avg_ms << "\n";
std::cout << "avg_push_us=" << measured.input.avg_push_us << "\n";
std::cout << "renegotiations=" << measured.input.renegotiations << "\n";

Run

This chapter needs no model archive. Run the Python and C++ (prebuilt) commands from the Neat install root (the directory that contains share/ and lib/); run the build from source commands from the repo root.

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_016_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

C++ (build from source):

./build.sh --target tutorial_016_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_016_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

Expected output (exact counts and timings depend on the host and policy):

inputs_enqueued=32
inputs_dropped=0
outputs_pulled=32
avg_latency_ms=0.42
avg_push_us=18.0
renegotiations=0
[OK] 016_tune_throughput_and_queues

(The Python build prints the same keys without the trailing [OK] line.)

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

In Practice

Practical guidance for queue sizing, drop policies, presets, and output-lifetime safety.

Queue sizing (queue_depth)

Heuristics:

  • Start with queue_depth = 4–16 for low‑latency pipelines.
  • Increase queues if your producer is bursty or if downstream elements have variable latency (decode/MLA/postproc).
  • Keep queues small if you need freshest frames (e.g., live camera preview).

Overflow policy (RunOptions::overflow_policy)

  • Block: safest for correctness; producer waits when queue is full.
  • DropIncoming: keep queued work, drop incoming samples when saturated.
  • KeepLatest: prefer freshest frames, drop the oldest queued samples.

For live feeds, KeepLatest usually yields the lowest end-to-end latency.

Presets and renegotiation

Use RunOptions::preset to control latency/safety tradeoffs:

  • Realtime: lowest latency, aggressive freshness behavior.
  • Balanced: starts zero-copy when possible, runs startup probe checks, and falls back to copy mode if reliability trips.
  • Reliable: conservative behavior and stable output ownership.

Input shape renegotiation is automatic for dynamic inputs (the renegotiations counter above reports how often it happened).

Output lifetimes (output_memory)

  • output_memory = Owned: returned Tensor owns its data.
  • output_memory = ZeroCopy: tensor may reference runtime buffers reused after pull.
  • output_memory = Auto: runtime chooses zero-copy first and falls back to owned where reliability requires it.

If you need to keep tensor data beyond the current step, call clone() or cpu().contiguous().

Buffer pool safety

  • RunAdvancedOptions::max_input_bytes sets a hard upper bound on input buffer allocation.
  • If a larger buffer is required, the runtime fails fast with an explicit error.

Use these to protect long‑running processes from unbounded allocations when inputs change size.

Full source

Show the complete C++ and Python programs
tutorials/016_tune_throughput_and_queues/tune_throughput_and_queues.cpp
// Tune async Graph throughput via RunOptions and MeasureReport.
//
// Usage:
// tutorial_016_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]

#include "neat.h"

#include <opencv2/core.hpp>

#include <iostream>
#include <stdexcept>
#include <string>

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}

} // namespace

int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);

cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();

simaai::neat::Graph graph;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
graph.add(simaai::neat::nodes::Input(in));
graph.add(simaai::neat::nodes::Output());

// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;

auto run = graph.build(std::vector<cv::Mat>{rgb}, opt);

// try_push never blocks; pair it with close_input + drain pull loop.
simaai::neat::MeasureOptions measure_opt;
measure_opt.title = "tutorial 016 throughput";
auto scope = run.start_measurement(measure_opt);
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();

int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto measured = scope.stop();

std::cout << "inputs_enqueued=" << measured.counters.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << measured.counters.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << measured.end_to_end.avg_ms << "\n";
std::cout << "avg_push_us=" << measured.input.avg_push_us << "\n";
std::cout << "renegotiations=" << measured.input.renegotiations << "\n";
std::cout << "[OK] 016_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source