Skip to main content

Run Inference Asynchronously

Run Inference Asynchronously — animated walkthrough overview

FieldValue
DifficultyBeginner
Estimated Read Time10-15 minutes
Labelsasync, push-pull, throughput, runtime

Chapter 001 ran a model with a single synchronous call: hand it one frame, block until the result comes back. That is simple, but it wastes compute — the thread that produces inputs and the thread that consumes outputs are the same thread, so they can never overlap. This chapter keeps the exact same ResNet-50 model and turns it into a throughput-oriented pipeline by splitting those two jobs.

The mechanism is the async Run: you build() the model into a Graph in Async mode, then drive it with two independent calls — push(...) from a producer and pull(...) from a consumer. By the end you will have a producer thread feeding frames as fast as the runtime accepts them while the main thread pulls predictions out, and a final pushed=N pulled=N line proving nothing was lost.

Walkthrough

Load the model

We start exactly as in chapter 001 — construct a Model from the archive — but here we also declare a RouteOptions with include_input and include_output set. Those flags tell the model to expose its own input and output boundaries when it is composed into a graph, so the surrounding pipeline can push frames in and pull tensors out.

tutorials/002_run_inference_async/run_inference_async.cpp
simaai::neat::Model model(model_path, build_options(size));
simaai::neat::Model::RouteOptions route_opt;
route_opt.include_input = true;
route_opt.include_output = true;

Build the async pipeline

A Model is not directly drivable with push/pull; a Run is. We wrap the model in a fresh Graph via graph.add(model.graph(route_opt)), then build(...) it with a representative frame. Passing the sample frame lets build() negotiate concrete tensor shapes up front. The returned Run is the handle both threads will share.

tutorials/002_run_inference_async/run_inference_async.cpp
simaai::neat::Graph graph;
graph.add(model.graph(route_opt));

auto run = graph.build(std::vector<cv::Mat>{frames.front()});

Push frames from a producer

The producer's only job is to feed inputs. We spawn a thread that loops over the prepared frames, calls push(...) for each, and then calls close_input() to signal that no more frames are coming — that signal is what lets the consumer know when to stop. Because the producer runs independently, it does not wait for any result before sending the next frame.

A std::thread runs the loop; an atomic pushed counter and a producer_done flag are updated as it goes so the main thread can observe progress without a lock.

tutorials/002_run_inference_async/run_inference_async.cpp
std::atomic<int> pushed{0};
std::atomic<bool> producer_done{false};
std::thread producer([&]() {
for (const cv::Mat& f : frames) {
run.push(std::vector<cv::Mat>{f});
pushed.fetch_add(1, std::memory_order_relaxed);
}
run.close_input();
producer_done.store(true);
});

Pull results on the consumer

The main thread consumes. It loops calling pull(timeout_ms=2000), which returns the next available output or nothing if none arrived within the timeout. On an empty pull we check whether the producer has finished — if so we stop, otherwise we keep waiting. Each real result is reduced to a top-1 class index and printed. After the loop we join the producer and confirm pushed == pulled.

pull() returns an optional<Sample>; extract tensors with tensors_from_sample(...) before reading bytes.

tutorials/002_run_inference_async/run_inference_async.cpp
int pulled = 0;
while (pulled < n) {
auto out = run.pull(/*timeout_ms=*/2000);
if (!out.has_value()) {
if (producer_done.load())
break;
continue;
}
std::cout << "top1=" << top1_from_output(*out) << "\n";
++pulled;
}
producer.join();

Run

Run it and you should see one top1= line per frame followed by a push/pull tally. Run the Python and C++ (prebuilt) commands from the Neat install root (the directory that contains share/ and lib/); run the build from source commands from the repo root.

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_002_run_inference_async \
--model /tmp/resnet_50.tar.gz --n 4

C++ (build from source):

./build.sh --target tutorial_002_run_inference_async
./build/tutorials-standalone/tutorial_002_run_inference_async \
--model /tmp/resnet_50.tar.gz --n 4

Expected output (the exact indices depend on the image; the C++ build adds a pushed=... field, the Python build prints only pulled=...):

top1=285
top1=285
top1=285
top1=285
pushed=4 pulled=4
[OK] 002_run_inference_async

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

In Practice

This chapter uses the async push/pull surface. To measure the same model with deterministic synthetic inputs, continue to Benchmark Your Model. For the full build-vs-run and sync-vs-async model plus the complete RunOptions surface, see Build an Inference Pipeline. For queue depth, overflow policy, and measurement under load, see Tune Throughput and Queue Depth.

Full source

Show the complete C++ and Python programs
tutorials/002_run_inference_async/run_inference_async.cpp
// Async push/pull: producer thread pushes frames, main thread pulls outputs.
//
// Usage:
// tutorial_002_run_inference_async --model /path/to/resnet_50.tar.gz [--image /path/to.jpg] [--n
// 4]

#include "neat.h"

#include <opencv2/core.hpp>
#include <opencv2/imgcodecs.hpp>
#include <opencv2/imgproc.hpp>

#include <atomic>
#include <cstring>
#include <exception>
#include <filesystem>
#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

namespace fs = std::filesystem;

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

cv::Mat load_rgb(const fs::path& image_path, int size) {
cv::Mat bgr = cv::imread(image_path.string(), cv::IMREAD_COLOR);
if (bgr.empty())
throw std::runtime_error("failed to read image: " + image_path.string());
if (bgr.cols != size || bgr.rows != size) {
cv::resize(bgr, bgr, cv::Size(size, size), 0, 0, cv::INTER_AREA);
}
cv::Mat rgb;
cv::cvtColor(bgr, rgb, cv::COLOR_BGR2RGB);
if (!rgb.isContinuous())
rgb = rgb.clone();
return rgb;
}

simaai::neat::Model::Options build_options(int size) {
simaai::neat::Model::Options opt;
opt.preprocess.color_convert.input_format = simaai::neat::PreprocessColorFormat::RGB;
opt.preprocess.input_max_width = size;
opt.preprocess.input_max_height = size;
opt.preprocess.input_max_depth = 3;
opt.preprocess.normalize.mean = {0.485f, 0.456f, 0.406f};
opt.preprocess.normalize.stddev = {0.229f, 0.224f, 0.225f};
return opt;
}

int top1_from_output(const simaai::neat::Sample& out) {
if (simaai::neat::tensors_from_sample(out, true).empty())
throw std::runtime_error("no tensor output");
const simaai::neat::Mapping m = simaai::neat::tensors_from_sample(out, true).front().map_read();
const size_t n = m.size_bytes / sizeof(float);
const float* p = reinterpret_cast<const float*>(m.data);
int best = 0;
for (size_t i = 1; i < n && i < 1000; ++i) {
if (p[i] > p[best])
best = static_cast<int>(i);
}
return best;
}

} // namespace

int main(int argc, char** argv) {
try {
std::string model_path, image;
if (!get_arg(argc, argv, "--model", model_path)) {
std::cerr
<< "Usage: tutorial_002_run_inference_async --model <path> [--image <path>] [--n <n>]\n";
return 1;
}
get_arg(argc, argv, "--image", image);
const int n = parse_int_arg(argc, argv, "--n", 4);
const int size = 224;

cv::Mat frame = image.empty() ? cv::Mat(size, size, CV_8UC3, cv::Scalar(99, 99, 99))
: load_rgb(image, size);
std::vector<cv::Mat> frames(n, frame);

// CORE LOGIC
// Build a Graph around the model and run it async: one producer thread pushes,
// the main thread pulls outputs.
simaai::neat::Model model(model_path, build_options(size));
simaai::neat::Model::RouteOptions route_opt;
route_opt.include_input = true;
route_opt.include_output = true;

simaai::neat::Graph graph;
graph.add(model.graph(route_opt));

auto run = graph.build(std::vector<cv::Mat>{frames.front()});

std::atomic<int> pushed{0};
std::atomic<bool> producer_done{false};
std::thread producer([&]() {
for (const cv::Mat& f : frames) {
run.push(std::vector<cv::Mat>{f});
pushed.fetch_add(1, std::memory_order_relaxed);
}
run.close_input();
producer_done.store(true);
});

int pulled = 0;
while (pulled < n) {
auto out = run.pull(/*timeout_ms=*/2000);
if (!out.has_value()) {
if (producer_done.load())
break;
continue;
}
std::cout << "top1=" << top1_from_output(*out) << "\n";
++pulled;
}
producer.join();

std::cout << "pushed=" << pushed.load() << " pulled=" << pulled << "\n";
if (pulled != n)
throw std::runtime_error("pulled=" + std::to_string(pulled) +
" != pushed=" + std::to_string(pushed.load()));
std::cout << "[OK] 002_run_inference_async\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source