Skip to main content

Run Multiple Streams in One Graph

Run Multiple Streams in One Graph — animated walkthrough overview

FieldValue
DifficultyAdvanced
Estimated Read Time20-25 minutes
Labelsgraph, multistream, scheduler, join

Earlier chapters pushed one input and pulled one output. Real multi-camera and parallel-branch systems are messier: several streams advance independently, and their results must be rejoined correctly before anything downstream can use them. This chapter shows the join primitive that makes that deterministic — a combine graph with two named inputs and one named output that emits a bundle only when both sides have produced the matching frame.

Every sample you push carries a stream_id and a frame_id. The combine policy ByFrame waits until both named inputs (left and right) have delivered a sample with the same frame_id, then emits exactly one combined bundle. By the end you will have built a combine graph, fanned a deterministic per-stream / per-frame workload through its two inputs, and pulled back the joined bundles — verifying both the output count and that each bundle carries two fields.

Walkthrough

Build the combine graph

graphs::Combine (C++) / graphs.combine (Python) returns a normal public Graph fragment — there is nothing special about it beyond its shape: two named inputs, one named output, and a join policy. We pass ["left", "right"] as the input names, "combined" as the output name, and CombinePolicy.ByFrame to select frame-id matching. Printing describe() shows the resulting topology, and build() turns the description into a runnable handle. The graph runs async by default so each stream can make progress on its own.

CombinePolicy.ByFrame matches on Sample.frame_id; CombinePolicy.ByPts is the alternative that matches on presentation timestamps (Sample.pts_ns) when frames don't share a clean frame index.

tutorials/015_run_multiple_streams/run_multiple_streams.cpp
simaai::neat::Graph graph = simaai::neat::graphs::Combine({"left", "right"}, "combined",
simaai::neat::CombinePolicy::ByFrame);

std::cout << graph.describe() << "\n";

simaai::neat::Run run = graph.build();

Push the streams

Now we drive the workload. For every frame and every stream we synthesize a small deterministic RGB sample tagged with its stream_id and a unique frame_id, then push it into both named inputs. Because the IDs are computed deterministically (frame * streams + sid), the join has an unambiguous pairing to find — left frame N always has a matching right frame N.

Each sample is constructed explicitly as a Sample wrapping a Tensor (HWC, UInt8, RGB) with frame_id and stream_id set; run.push("left", sample) returns a bool you should check against run.last_error().

tutorials/015_run_multiple_streams/run_multiple_streams.cpp
for (int frame = 0; frame < frames; ++frame) {
for (int sid = 0; sid < streams; ++sid) {
const int logical_frame = frame * streams + sid;
if (!run.push("left", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("left push failed: " + run.last_error());
}
if (!run.push("right", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("right push failed: " + run.last_error());
}
}
}

Pull the joined bundles

With all inputs pushed, we pull from the single named output "combined". Each successful pull returns one bundle that the runtime emitted only after both inputs delivered the matching frame. We count the bundles and (in C++) assert each carries two fields — the image+bbox pair the join produced — then close() the run to tear it down cleanly. The expected bundle count equals streams * frames, proving no pairing was dropped.

run.pull("combined", timeout_ms) returns an optional bundle; we read bundle.stream_id and bundle.fields.size() and verify the first bundle has 2 fields.

tutorials/015_run_multiple_streams/run_multiple_streams.cpp
const int expected = streams * frames;
int received = 0;
int first_fields = -1;
for (int i = 0; i < expected; ++i) {
auto maybe_bundle = run.pull("combined", /*timeout_ms=*/2000);
if (!maybe_bundle.has_value()) {
throw std::runtime_error("timed out waiting for combined output");
}
const auto& bundle = *maybe_bundle;
if (first_fields < 0)
first_fields = static_cast<int>(bundle.fields.size());
++received;
if (i < 4) {
std::cout << "bundle stream=" << bundle.stream_id << " fields=" << bundle.fields.size()
<< "\n";
}
}

run.close();

Run

This chapter needs no model archive. Run the Python and C++ (prebuilt) commands from the Neat install root (the directory that contains share/ and lib/); run the build from source commands from the repo root.

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_015_run_multiple_streams \
--streams 8 --frames 4

C++ (build from source):

./build.sh --target tutorial_015_run_multiple_streams
./build/tutorials-standalone/tutorial_015_run_multiple_streams \
--streams 8 --frames 4

Expected output (the C++ build prints the graph description and the first few bundles first):

received=32 fields=2
[OK] 015_run_multiple_streams

(The Python build prints expected=32 received=32.)

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

Full source

Show the complete C++ and Python programs
tutorials/015_run_multiple_streams/run_multiple_streams.cpp
// Multistream public Graph: named inputs -> Combine(ByFrame) -> named output bundle.
//
// Usage:
// tutorial_015_run_multiple_streams [--streams 8] [--frames 4]

#include "neat.h"

#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

std::vector<int64_t> contiguous_strides_bytes(const std::vector<int64_t>& shape,
int64_t elem_bytes) {
std::vector<int64_t> strides(shape.size(), 0);
int64_t stride = elem_bytes;
for (int i = static_cast<int>(shape.size()) - 1; i >= 0; --i) {
strides[static_cast<size_t>(i)] = stride;
stride *= shape[static_cast<size_t>(i)];
}
return strides;
}

simaai::neat::Sample make_rgb_sample(const std::string& stream_id, int frame_id) {
const int w = 8;
const int h = 6;
const int c = 3;
const std::size_t bytes = static_cast<std::size_t>(w) * h * c;

simaai::neat::Tensor t;
t.device = {simaai::neat::DeviceType::CPU, 0};
t.dtype = simaai::neat::TensorDType::UInt8;
t.layout = simaai::neat::TensorLayout::HWC;
t.shape = {h, w, c};
t.semantic.image = simaai::neat::ImageSpec{simaai::neat::ImageSpec::PixelFormat::RGB, ""};
t.storage = simaai::neat::make_cpu_owned_storage(bytes);
t.strides_bytes = contiguous_strides_bytes(t.shape, 1);
t.read_only = false;
{
auto map = t.map(simaai::neat::MapMode::Write);
auto* p = static_cast<std::uint8_t*>(map.data);
for (std::size_t i = 0; i < bytes; ++i)
p[i] = static_cast<std::uint8_t>(i % 255);
}
t.read_only = true;

simaai::neat::Sample sample;
sample.kind = simaai::neat::SampleKind::Tensor;
sample.tensor = std::move(t);
sample.frame_id = frame_id;
sample.stream_id = stream_id;
return sample;
}

} // namespace

int main(int argc, char** argv) {
try {
const int streams = parse_int_arg(argc, argv, "--streams", 8);
const int frames = parse_int_arg(argc, argv, "--frames", 4);

// CORE LOGIC
// `graphs::Combine` is a normal public Graph fragment. It declares two
// named inputs ("left", "right") and one named output ("combined"). ByFrame
// means the runtime emits one bundle only after both inputs have delivered
// samples with the same Sample::frame_id.
simaai::neat::Graph graph = simaai::neat::graphs::Combine({"left", "right"}, "combined",
simaai::neat::CombinePolicy::ByFrame);

std::cout << graph.describe() << "\n";

simaai::neat::Run run = graph.build();

for (int frame = 0; frame < frames; ++frame) {
for (int sid = 0; sid < streams; ++sid) {
const int logical_frame = frame * streams + sid;
if (!run.push("left", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("left push failed: " + run.last_error());
}
if (!run.push("right", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("right push failed: " + run.last_error());
}
}
}

const int expected = streams * frames;
int received = 0;
int first_fields = -1;
for (int i = 0; i < expected; ++i) {
auto maybe_bundle = run.pull("combined", /*timeout_ms=*/2000);
if (!maybe_bundle.has_value()) {
throw std::runtime_error("timed out waiting for combined output");
}
const auto& bundle = *maybe_bundle;
if (first_fields < 0)
first_fields = static_cast<int>(bundle.fields.size());
++received;
if (i < 4) {
std::cout << "bundle stream=" << bundle.stream_id << " fields=" << bundle.fields.size()
<< "\n";
}
}

run.close();

if (received != expected)
throw std::runtime_error("expected=" + std::to_string(expected) +
" received=" + std::to_string(received));
if (first_fields != 2)
throw std::runtime_error("join should emit an image+bbox bundle");

std::cout << "received=" << received << " fields=" << first_fields << "\n";
std::cout << "[OK] 015_run_multiple_streams\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source