KEMBAR78
Creating an Application - NVIDIA Docs
NVIDIA Holoscan SDK v3.7.0

Creating an Application

In this section, we’ll address:

Note

This section covers basics of applications running as a single fragment. For multi-fragment applications, refer to the distributed application documentation.

The following code snippet shows an example Application code skeleton:

  • We define the App class that inherits from the Application base class.

  • We create an instance of the App class in main() using the make_application() function.

  • The run() method starts the application which will execute its compose() method where the custom workflow will be defined.

Copy
Copied!
            

#include <holoscan/holoscan.hpp> class App : public holoscan::Application { public: void compose() override { // Define Operators and workflow // ... } }; int main() { auto app = holoscan::make_application<App>(); app->run(); return 0; }

  • We define the App class that inherits from the Application base class.

  • We create an instance of the App class in a main() function that is called from __main__.

  • The run() method starts the application which will execute its compose() method where the custom workflow will be defined.

Copy
Copied!
            

from holoscan.core import Application class App(Application): def compose(self): # Define Operators and workflow # ... def main(): app = App() app.run() if __name__ == "__main__": main()

Note

It is recommended to call run() from within a separate main() function rather than calling it directly from __main__. This will ensure that the Application’s destructor is called before the Python process exits.

Tip

This is also illustrated in the hello_world example.


It is also possible to instead launch the application asynchronously (i.e., non-blocking for the thread launching the application), as shown below:

This can be done simply by replacing the call to run() with run_async() which returns a std::future. Calling future.get() will block until the application has finished running and throw an exception if a runtime error occurred during execution.

Copy
Copied!
            

int main() { auto app = holoscan::make_application<App>(); auto future = app->run_async(); future.get(); return 0; }

This can be done simply by replacing the call to run() with run_async() which returns a Python concurrent.futures.Future. Calling future.result() will block until the application has finished running and raise an exception if a runtime error occurred during execution.

Copy
Copied!
            

def main(): app = App() future = app.run_async() future.result() if __name__ == "__main__": main()

Tip

This is also illustrated in the ping_simple_run_async example.

An application can be configured at different levels:

  1. providing the GXF extensions that need to be loaded (when using GXF operators).

  2. configuring parameters for your application, including for: a. the operators in the workflow. b. the scheduler of your application.

  3. configuring some runtime properties when deploying for production.

The sections below will describe how to configure each of them, starting with a native support for YAML-based configuration for convenience.

YAML configuration support

Holoscan supports loading arbitrary parameters from a YAML configuration file at runtime, making it convenient to configure each item listed above, or other custom parameters you wish to add on top of the existing API. For C++ applications, it also provides the ability to change the behavior of your application without needing to recompile it.

Note

Usage of the YAML utility is optional. Configurations can be hardcoded in your program, or done using any parser that you choose.

Here is an example YAML configuration:

Copy
Copied!
            

string_param: "test" float_param: 0.50 bool_param: true dict_param: key_1: value_1 key_2: value_2

Ingesting these parameters can be done using the two methods below:

  • The config() method takes the path to the YAML configuration file. If the input path is relative, it will be relative to the current working directory.

  • The from_config() method returns an ArgList object for a given key in the YAML file. It holds a list of Arg objects, each of which holds a name (key) and a value.

    • If the ArgList object has only one Arg (when the key is pointing to a scalar item), it can be converted to the desired type using the as() method by passing the type as an argument.

    • The key can be a dot-separated string to access nested fields.

  • The config_keys() method returns an unordered set of the key names accessible via from_config().

Copy
Copied!
            

// Pass configuration file auto app = holoscan::make_application<App>(); app->config("path/to/app_config.yaml"); // Scalars auto string_param = app->from_config("string_param").as<std::string>(); auto float_param = app->from_config("float_param").as<float>(); auto bool_param = app->from_config("bool_param").as<bool>(); // Dict auto dict_param = app->from_config("dict_param"); auto dict_nested_param = app->from_config("dict_param.key_1").as<std::string>(); // Print std::cout << "string_param: " << string_param << std::endl; std::cout << "float_param: " << float_param << std::endl; std::cout << "bool_param: " << bool_param << std::endl; std::cout << "dict_param:\n" << dict_param.description() << std::endl; std::cout << "dict_param['key1']: " << dict_nested_param << std::endl; // // Output // string_param: test // float_param: 0.5 // bool_param: 1 // dict_param: // name: arglist // args: // - name: key_1 // type: YAML::Node // value: value_1 // - name: key_2 // type: YAML::Node // value: value_2 // dict_param['key1']: value_1

  • The config() method takes the path to the YAML configuration file. If the input path is relative, it will be relative to the current working directory.

  • The kwargs() method return a regular Python dict for a given key in the YAML file.

    • Advanced: this method wraps the from_config() method similar to the C++ equivalent, which returns an ArgList object if the key is pointing to a map item, or an Arg object if the key is pointing to a scalar item. An Arg object can be cast to the desired type (e.g., str(app.from_config("string_param"))).

  • The config_keys() method returns a set of the key names accessible via from_config().

Copy
Copied!
            

# Pass configuration file app = App() app.config("path/to/app_config.yaml") # Scalars string_param = app.kwargs("string_param")["string_param"] float_param = app.kwargs("float_param")["float_param"] bool_param = app.kwargs("bool_param")["bool_param"] # Dict dict_param = app.kwargs("dict_param") dict_nested_param = dict_param["key_1"] # Print print(f"string_param:{string_param}") print(f"float_param:{float_param}") print(f"bool_param:{bool_param}") print(f"dict_param:{dict_param}") print(f"dict_param['key_1']:{dict_nested_param}") # # Output: # string_param: test # float_param: 0.5 # bool_param: True # dict_param: {'key_1': 'value_1', 'key_2': 'value_2'} # dict_param['key_1']: 'value_1'

Warning

from_config() cannot be used as inputs to the built-in operators at this time. Therefore, it’s recommended to use kwargs() in Python.

Tip

This is also illustrated in the video_replayer example.

Attention

With both from_config and kwargs, the returned ArgList/dictionary will include both the key and its associated item if that item value is a scalar. If the item is a map/dictionary itself, the input key is dropped, and the output will only hold the key/values from that item.

Loading GXF extensions

If you use operators that depend on GXF extensions for their implementations (known as GXF operators), the shared libraries (.so) of these extensions need to be dynamically loaded as plugins at runtime.

The SDK already automatically handles loading the required extensions for the built-in operators in both C++ and Python, as well as common extensions (listed here). To load additional extensions for your own operators, you can use one of the following approach:

Copy
Copied!
            

extensions: - libgxf_myextension1.so - /path/to/libgxf_myextension2.so

Copy
Copied!
            

auto app = holoscan::make_application<App>(); auto exts = {"libgxf_myextension1.so", "/path/to/libgxf_myextension2.so"}; for (auto& ext : exts) { app->executor().extension_manager()->load_extension(ext); }

Copy
Copied!
            

from holoscan.gxf import load_extensions from holoscan.core import Application app = Application() context = app.executor.context_uint64 exts = ["libgxf_myextension1.so", "/path/to/libgxf_myextension2.so"] load_extensions(context, exts)

Note

To be discoverable, paths to these shared libraries need to either be absolute, relative to your working directory, installed in the lib/gxf_extensions folder of the holoscan package, or listed under the HOLOSCAN_LIB_PATH or LD_LIBRARY_PATH environment variables.

Please see other examples in the system tests in the Holoscan SDK repository.

Configuring operators

Operators are defined in the compose() method of your application. They are not instantiated (with the initialize method) until an application’s run() method is called.

Operators have three type of fields which can be configured: parameters, conditions, and resources.

Configuring operator parameters

Operators could have parameters defined in their setup method to better control their behavior (see details when creating your own operators). The snippet below would be the implementation of this method for a minimal operator named MyOp, that takes a string and a boolean as parameters; we’ll ignore any extra details for the sake of this example:

Copy
Copied!
            

void setup(OperatorSpec& spec) override { spec.param(string_param_, "string_param"); spec.param(bool_param_, "bool_param"); }

Copy
Copied!
            

def setup(self, spec: OperatorSpec): spec.param("string_param") spec.param("bool_param") # Optional in python. Could define `self.<param_name>` instead in `def __init__`

Tip

Given an instance of an operator class, you can print a human-readable description of its specification to inspect the parameter fields that can be configured on that operator class:

Copy
Copied!
            

std::cout << operator_object->spec()->description() << std::endl;

Copy
Copied!
            

print(operator_object.spec)

Given this YAML configuration:

Copy
Copied!
            

myop_param: string_param: "test" bool_param: true bool_param: false # we'll use this later

We can configure an instance of the MyOp operator in the application’s compose method like this:

Copy
Copied!
            

void compose() override { // Using YAML auto my_op1 = make_operator<MyOp>("my_op1", from_config("myop_param")); // Same as above auto my_op2 = make_operator<MyOp>("my_op2", Arg("string_param", std::string("test")), // can use Arg(key, value)... Arg("bool_param") = true // ... or Arg(key) = value ); }

Copy
Copied!
            

def compose(self): # Using YAML my_op1 = MyOp(self, name="my_op1", **self.kwargs("myop_param")) # Same as above my_op2 = MyOp(self, name="my_op2", string_param="test", bool_param=True, )

Tip

This is also illustrated in the ping_custom_op example.

If multiple ArgList are provided with duplicate keys, the latest one overrides them:

Copy
Copied!
            

void compose() override { // Using YAML auto my_op1 = make_operator<MyOp>("my_op1", from_config("myop_param"), from_config("bool_param") ); // Same as above auto my_op2 = make_operator<MyOp>("my_op2", Arg("string_param", "test"), Arg("bool_param") = true, Arg("bool_param") = false ); // -> my_op `bool_param_` will be set to `false` }

Copy
Copied!
            

def compose(self): # Using YAML my_op1 = MyOp(self, name="my_op1", from_config("myop_param"), from_config("bool_param"), ) # Note: We're using from_config above since we can't merge automatically with kwargs # as this would create duplicated keys. However, we recommend using kwargs in Python # to avoid limitations with wrapped operators, so the code below is preferred. # Same as above params = self.kwargs("myop_param").update(self.kwargs("bool_param")) my_op2 = MyOp(self, name="my_op2", params) # -> my_op `bool_param` will be set to `False`

Configuring operator conditions

By default, operators with no input ports will continuously run, while operators with input ports will run as long as they receive inputs (as they’re configured with the MessageAvailableCondition).

To change that behavior, one or more other conditions’ classes can be passed to the constructor of an operator to define when it should execute.

For example, we set three conditions on this operator my_op:

Copy
Copied!
            

void compose() override { using namespace holoscan; using namespace std::chrono_literals; // Limit to 10 iterations auto c1 = make_condition<CountCondition>("my_count_condition", 10); // Wait at least 200 milliseconds between each execution auto c2 = make_condition<PeriodicCondition>("my_periodic_condition", 200ms); // Stop when the condition calls `disable_tick()` auto c3 = make_condition<BooleanCondition>("my_bool_condition"); // Pass directly to the operator constructor auto my_op = make_operator<MyOp>("my_op", c1, c2, c3); }

Copy
Copied!
            

def compose(self): # Limit to 10 iterations c1 = CountCondition(self, 10, name="my_count_condition") # Wait at least 200 milliseconds between each execution c2 = PeriodicCondition(self, timedelta(milliseconds=200), name="my_periodic_condition") # Stop when the condition calls `disable_tick()` c3 = BooleanCondition(self, name="my_bool_condition") # Pass directly to the operator constructor my_op = MyOp(self, c1, c2, c3, name="my_op")

Tip

This is also illustrated in the conditions’ examples.

Note

You’ll need to specify a unique name for the conditions if there are multiple conditions applied to an operator.

Configuring operator resources

Some resources can be passed to the operator’s constructor, typically an allocator passed as a regular parameter.

For example:

Copy
Copied!
            

void compose() override { // Allocating memory pool of specific size on the GPU // ex: width * height * channels * channel size in bytes auto block_size = 640 * 480 * 4 * 2; auto p1 = make_resource<BlockMemoryPool>("my_pool1", 1, size, 1); // Provide unbounded memory pool auto p2 = make_condition<UnboundedAllocator>("my_pool2"); // Pass to operator as parameters (name defined in operator setup) auto my_op = make_operator<MyOp>("my_op", Arg("pool1", p1), Arg("pool2", p2)); }

Copy
Copied!
            

def compose(self): # Allocating memory pool of specific size on the GPU # ex: width * height * channels * channel size in bytes block_size = 640 * 480 * 4 * 2; p1 = BlockMemoryPool(self, name="my_pool1", storage_type=1, block_size=block_size, num_blocks=1) # Provide unbounded memory pool p2 = UnboundedAllocator(self, name="my_pool2") # Pass to operator as parameters (name defined in operator setup) auto my_op = MyOp(self, name="my_op", pool1=p1, pool2=p2)

Native resource creation

The resources bundled with the SDK are wrapping an underlying GXF component. However, it is also possible to define a “native” resource without any need to create and wrap an underlying GXF component. Such a resource can also be passed conditionally to an operator in the same way as the resources created in the previous section.

For example:

To create a native resource, implement a class that inherits from Resource

Copy
Copied!
            

namespace holoscan { class MyNativeResource : public holoscan::Resource { public: HOLOSCAN_RESOURCE_FORWARD_ARGS_SUPER(MyNativeResource, Resource) MyNativeResource() = default; // add any desired parameters in the setup method // (a single string parameter is shown here for illustration) void setup(ComponentSpec& spec) override { spec.param(message_, "message", "Message string", "Message String", std::string("test message")); } // add any user-defined methods (these could be called from an Operator's compute method) std::string message() { return message_.get(); } private: Parameter<std::string> message_; }; } // namespace: holoscan

The setup method can be used to define any parameters needed by the resource.

This resource can be used with a C++ operator, just like any other resource. For example, an operator could have a parameter holding a shared pointer to MyNativeResource as below.

Copy
Copied!
            

private: class MyOperator : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(MyOperator) MyOperator() = default; void setup(OperatorSpec& spec) override { spec.param(message_resource_, "message_resource", "message resource", "resource printing a message"); } void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override { HOLOSCAN_LOG_TRACE("MyOp::compute()"); // get a resource based on its name (this assumes the app author named the resource "message_resource") auto res = resource<MyNativeResource>("message_resource"); if (!res) { throw std::runtime_error("resource named 'message_resource' not found!"); } // call a method on the retrieved resource class auto message = res->message(); }; private: Parameter<std::shared_ptr<holoscan::MyNativeResource> message_resource_; }

The compute method above demonstrates how the templated resource method can be used to retrieve a resource.

and the resource could be created and passed via a named argument in the usual way

Copy
Copied!
            

// example code for within Application::compose (or Fragment::compose) auto message_resource = make_resource<holoscan::MyNativeResource>( "message_resource", holoscan::Arg("message", "hello world"); auto my_op = std::make_operator<holoscan::ops::MyOperator>( "my_op", holoscan::Arg("message_resource", message_resource));

As with GXF-based resources, it is also possible to pass a native resource as a positional argument to the operator constructor.

For a concreate example of native resource use in a real application, see the volume_rendering_xr application on Holohub. This application uses a native XrSession resource type which corresponds to a single OpenXR session. This single “session” resource can then be shared by both the XrBeginFrameOp and XrEndFrameOp operators.

To create a native resource, implement a class that inherits from Resource.

Copy
Copied!
            

class MyNativeResource(Resource): def __init__(self, fragment, message="test message", *args, **kwargs): self.message = message super().__init__(fragment, *args, **kwargs) # Could optionally define Parameter as in C++ via spec.param as below. # Here, we chose instead to pass message as an argument to __init__ above. # def setup(self, spec: ComponentSpec): # spec.param("message", "test message") # define a custom method def message(self): return self.message

The below shows how some custom operator could use such a resource in its compute method

Copy
Copied!
            

class MyOperator(Operator): def compute(self, op_input, op_output, context): resource = self.resource("message_resource") if resource is None: raise ValueError("expected message resource not found") assert isinstance(resource, MyNativeResource) print(f"message ={resource.message()")

where this native resource could have been created and passed positionally to MyOperator as follows

Copy
Copied!
            

# example code within Application.compose (or Fragment.compose) message_resource = MyNativeResource( fragment=self, message="hello world", name="message_resource") # pass the native resource as a positional argument to MyOperator my_op = MyOperator(fragment=self, message_resource)

There is a minimal example of native resource use in the examples/native folder.

Configuring the scheduler

The scheduler controls how the application schedules the execution of the operators that make up its workflow.

The default scheduler is a single-threaded GreedyScheduler. An application can be configured to use a different scheduler Scheduler (C++/Python) or change the parameters from the default scheduler, using the scheduler() function (C++/Python).

For example, if an application needs to run multiple operators in parallel, the MultiThreadScheduler or EventBasedScheduler can instead be used. The difference between the two is that the MultiThreadScheduler is based on actively polling operators to determine if they are ready to execute, while the EventBasedScheduler will instead wait for an event indicating that an operator is ready to execute. Additionally, the EventBasedScheduler also offers options for running time-critical operators under real-time scheduling policies supported by Linux kernel (see Real-time scheduling with thread pools).

The code snippet below shows how to set and configure a non-default scheduler:

Copy
Copied!
            

auto app = holoscan::make_application<App>(); auto scheduler = app->make_scheduler<holoscan::EventBasedScheduler>( "myscheduler", Arg("worker_thread_number", static_cast<int64_t>(4)), Arg("stop_on_deadlock", true) ); app->scheduler(scheduler); app->run();

  • We create an instance of a Scheduler class in the schedulers module. Like operators, parameters can come from an explicit Arg or ArgList, or from a YAML configuration.

  • The scheduler() method assigns the scheduler to be used by the application.

Copy
Copied!
            

app = App() scheduler = holoscan.schedulers.EventBasedScheduler( app, name="myscheduler", worker_thread_number=4, stop_on_deadlock=True, ) app.scheduler(scheduler) app.run()

Tip

This is also illustrated in the multithread example.

Configuring worker thread pools

Both the MultiThreadScheduler and EventBasedScheduler discussed in the previous section automatically create an internal worker thread pool by default. In some scenarios, it may be desirable for users to instead assign operators to specific user-defined thread pools. This also allows optionally pinning operators to a specific thread.

Assume I have three operators, op1, op2 and op3, that I want to assign to a thread pool. I would also like to pin op2 and op3 to specific threads within the pool. The example below shows the code for configuring thread pools to achieve this from the Fragment compose method.

We create thread pools via calls to the make_thread_pool() method. The first argument is a user-defined name for the thread pool while the second is the number of threads initially in the thread pool. This make_thread_pool method returns a shared pointer to a ThreadPool object. The add() method of that object can then be used to add a single operator or a vector of operators to the thread pool. The second argument to the add function is a boolean indicating whether the given operators should be pinned to always run on a specific thread within the thread pool.

Copy
Copied!
            

// The following code would be within `Fragment::compose` after operators have been defined // Assume op1, op2 and op3 are `shared_ptr<OperatorType>` as returned by `make_operator` // create a thread pool with a three threads auto pool1 = make_thread_pool("pool1", 3); // assign a single operator to the thread pool (unpinned) pool1->add(op1, false); // assign multiple operators to this thread pool (pinned) pool1->add({op2, op3}, true);

We create thread pools via calls to the make_thread_pool() method. The first argument is a user-defined name for the thread pool while the second is the initial size of the thread pool. It is not necessary to modify this as the size will be incremented as needed automatically. This make_thread_pool method returns a shared pointer to a ThreadPool object. The add() method of that object can then be used to add a single operator or a vector of operators to the thread pool. The second argument to the add function is a boolean indicating whether the given operators should be pinned to always run on a specific thread within the thread pool.

Copy
Copied!
            

# The following code would be within `Fragment::compose` after operators have been defined # Assume op1, op2 and op3 are `shared_ptr<OperatorType>` as returned by `make_operator` # create a thread pool with a single thread pool1 = self.make_thread_pool("pool1", 1); # assign a single operator to the thread pool (unpinned) pool1.add(op1, True); # assign multiple operators to this thread pool (pinned) pool1.add([op2, op3], True);

Note

It is not necessary to define a thread pool for Holoscan applications. There is a default thread pool that gets used for any operators the user did not explicitly assign to a thread pool. The use of thread pools provides a way to explicitly indicate that threads should be pinned.

One case where separate thread pools must be used is in order to support pinning of operators involving separate GPU devices. Only a single GPU device should be used from any given thread pool. Operators associated with a GPU device resource are those using one of the CUDA-based allocators like BlockMemoryPool, CudaStreamPool, RMMAllocator or StreamOrderedAllocator.

Tip

A concrete example of a simple application with two pairs of operators in separate thread pools is given in the thread pool resource example.

Note that any given operator can only belong to a single thread pool. Assigning the same operator to multiple thread pools may result in errors being logged at application startup time.

There is also a related boolean parameter, strict_thread_pinning that can be passed as a holoscan::Arg to the MultiThreadScheduler constructor. When this argument is set to false and an operator is pinned to a specific thread, it is allowed for other operators to also run on that same thread whenever the pinned operator is not ready to execute. When strict_thread_pinning is true, the thread can ONLY be used by the operator that was pinned to the thread. For the EventBasedScheduler, it is always in strict pinning mode and there is no such parameter.

If a thread pool is configured by the single-thread GreedyScheduler is used a warning will be logged indicating that the user-defined thread pools would be ignored. Only MultiThreadScheduler and EventBasedScheduler can make use of the thread pools.

Linux real-time scheduling with thread pools

The EventBasedScheduler offers additional features to pin an operator to a dedicated worker thread scheduled by real-time scheduling policies supported in the Linux kernel. The configuration can be done by using the add_realtime() method (in contrast to the add() method) in ThreadPool to assign an operator with a real-time scheduling policy along with the parameters required for the selected scheduling policy. The supported real-time scheduling policies are:

  • SCHED_FIFO (SchedulingPolicy::kFirstInFirstOut): First-in-first-out scheduling policy that provides priority execution. Operators with this policy will run until completion or until preempted by a higher priority Linux process or thread. Operators with the same priority under SCHED_FIFO are scheduled in a first-in-first-out fashion.

  • SCHED_RR (SchedulingPolicy::kRoundRobin): Round-robin scheduling policy that provides execution with CPU time sharing for operators with the same priority level in a round-robin fashion.

  • SCHED_DEADLINE (SchedulingPolicy::kDeadline): Earliest Deadline First scheduling policy that ensures operators meet their specified deadlines. This policy requires setting runtime, deadline, and period parameters.

For more detailed information about Linux kernel schedulers, refer to the Ubuntu Real-time documentation.

Important

Important Notes About Using Real-time Scheduling Polices:

  • SCHED_DEADLINE Behavior: Since SCHED_DEADLINE inherently enforces periodic execution, adding a PeriodicCondition to these operators is unnecessary.

  • Operator Conditions Still Apply: Real-time scheduling policies work alongside existing operator conditions. While real-time policies reduce overall scheduling latency, the actual operator execution start timing may still be constrained by conditions defined in the application’s graph structure.

  • Understanding the Scope: The Holoscan SDK integrates with Linux kernel real-time scheduling policies but cannot guarantee real-time performance across your entire application. This feature offers a way to reduce scheduling overhead for specific time-sensitive operators, but the overall system behavior depends on your application design and the underlying Linux kernel configuration.

Note

Using real-time scheduling policies requires appropriate Linux kernel configuration and may require running sudo sysctl -w kernel.sched_rt_runtime_us=-1 beforehand to disable the real-time runtime limit.

Container Requirements:

  • SCHED_DEADLINE: Requires root privileges and --cap-add=CAP_SYS_NICE when running in a container

  • SCHED_FIFO/SCHED_RR: May require --ulimit rtprio=99 when running in a container (can replace 99 with the highest value actually used for the sched_priority argument to add_realtime())

Here’s an example of configuring operators to run with real-time policies:

Copy
Copied!
            

// Create a thread pool for real-time operators auto realtime_pool = make_thread_pool("realtime_pool", 2); // Add operator with SCHED_FIFO policy and priority 1, pinned to CPU core 0 realtime_pool->add_realtime(op1, SchedulingPolicy::kFirstInFirstOut, true, {0}, 1); // Add operator with SCHED_RR policy and priority 2, pinned to CPU core 1 realtime_pool->add_realtime(op2, SchedulingPolicy::kRoundRobin, true, {1}, 2); // Add operator with SCHED_DEADLINE policy, pinned to CPU core 2 // runtime: 1ms, deadline: 10ms, period: 10ms realtime_pool->add_realtime(op3, SchedulingPolicy::kDeadline, true, {2}, 0, 1000000, 10000000, 10000000);

Copy
Copied!
            

# Import required for real-time scheduling from holoscan.resources import SchedulingPolicy # Create a thread pool for real-time operators realtime_pool = self.make_thread_pool("realtime_pool", 2) # Add operator with SCHED_FIFO policy and priority 1, pinned to CPU core 0 realtime_pool.add_realtime( op1, sched_policy=SchedulingPolicy.SCHED_FIFO, pin_operator=True, pin_cores=[0], sched_priority=1 ) # Add operator with SCHED_RR policy and priority 2, pinned to CPU core 1 realtime_pool.add_realtime( op2, sched_policy=SchedulingPolicy.SCHED_RR, pin_operator=True, pin_cores=[1], sched_priority=2 ) # Add operator with SCHED_DEADLINE policy, pinned to CPU core 2 # runtime: 1ms, deadline: 10ms, period: 10ms realtime_pool.add_realtime( op3, sched_policy=SchedulingPolicy.SCHED_DEADLINE, pin_operator=True, pin_cores=[2], sched_runtime=1000000, sched_deadline=10000000, sched_period=10000000, )

Configuring runtime properties

As described below, applications can run simply by executing the C++ or Python application manually on a given node, or by packaging it in a HAP container. With the latter, runtime properties need to be configured: refer to the App Runner Configuration for details.

Note

Operators are initialized according to the topological order of its fragment-graph. When an application runs, the operators are executed in the same topological order. Topological ordering of the graph ensures that all the data dependencies of an operator are satisfied before its instantiation and execution. Currently, we do not support specifying a different and explicit instantiation and execution order of the operators.

One-operator Workflow

The simplest form of a workflow would be a single operator.

graphviz-8efeecb48c58f5386369c48eaeef4a22e69d1fcd.png

Fig. 12 A one-operator workflow

The graph above shows an Operator (C++/Python) (named MyOp) that has neither inputs nor output ports.

  • Such an operator may accept input data from the outside (e.g., from a file) and produce output data (e.g., to a file) so that it acts as both the source and the sink operator.

  • Arguments to the operator (e.g., input/output file paths) can be passed as parameters as described in the section above.

We can add an operator to the workflow by calling add_operator (C++/Python) method in the compose() method.

The following code shows how to define a one-operator workflow in compose() method of the App class (assuming that the operator class MyOp is declared/defined in the same file).

Copy
Copied!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto my_op = make_operator<MyOp>("my_op"); // Define the workflow add_operator(my_op); } };

Copy
Copied!
            

class App(Application): def compose(self): # Define Operators my_op = MyOp(self, name="my_op") # Define the workflow self.add_operator(my_op)

Linear Workflow

Here is an example workflow where the operators are connected linearly:

graphviz-c45d32849d97f0eeca87975cf39776b0f641e83c.png

Fig. 13 A linear workflow

In this example, SourceOp produces a message and passes it to ProcessOp. ProcessOp produces another message and passes it to SinkOp.

We can connect two operators by calling the add_flow() method (C++/Python) in the compose() method.

The add_flow() method (C++/Python) takes the source operator, the destination operator, and the optional port name pairs. The port name pair is used to connect the output port of the source operator to the input port of the destination operator. The first element of the pair is the output port name of the upstream operator and the second element is the input port name of the downstream operator. An empty port name (“”) can be used for specifying a port name if the operator has only one input/output port. If there is only one output port in the upstream operator and only one input port in the downstream operator, the port pairs can be omitted.

The following code shows how to define a linear workflow in the compose() method of the App class (assuming that the operator classes SourceOp, ProcessOp, and SinkOp are declared/defined in the same file).

Copy
Copied!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto source = make_operator<SourceOp>("source"); auto process = make_operator<ProcessOp>("process"); auto sink = make_operator<SinkOp>("sink"); // Define the workflow add_flow(source, process); // same as `add_flow(source, process, {{"output", "input"}});` add_flow(process, sink); // same as `add_flow(process, sink, {{"", ""}});` } };

Copy
Copied!
            

class App(Application): def compose(self): # Define Operators source = SourceOp(self, name="source") process = ProcessOp(self, name="process") sink = SinkOp(self, name="sink") # Define the workflow self.add_flow(source, process) # same as `self.add_flow(source, process, {("output", "input")})` self.add_flow(process, sink) # same as `self.add_flow(process, sink, {("", "")})`

Complex Workflow (Multiple Inputs and Outputs)

You can design a complex workflow like below where some operators have multi-inputs and/or multi-outputs:

graphviz-05a77fe15e35f2a15dc49175047424d743335b87.png

Fig. 14 A complex workflow (multiple inputs and outputs)

Copy
Copied!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto reader1 = make_operator<Reader1>("reader1"); auto reader2 = make_operator<Reader2>("reader2"); auto processor1 = make_operator<Processor1>("processor1"); auto processor2 = make_operator<Processor2>("processor2"); auto processor3 = make_operator<Processor3>("processor3"); auto writer = make_operator<Writer>("writer"); auto notifier = make_operator<Notifier>("notifier"); // Define the workflow add_flow(reader1, processor1, {{"image", "image1"}, {"image", "image2"}, {"metadata", "metadata"}}); add_flow(reader1, processor1, {{"image", "image2"}}); add_flow(reader2, processor2, {{"roi", "roi"}}); add_flow(processor1, processor2, {{"image", "image"}}); add_flow(processor1, writer, {{"image", "image"}}); add_flow(processor2, notifier); add_flow(processor2, processor3); add_flow(processor3, writer, {{"seg_image", "seg_image"}}); } };

Copy
Copied!
            

class App(Application): def compose(self): # Define Operators reader1 = Reader1Op(self, name="reader1") reader2 = Reader2Op(self, name="reader2") processor1 = Processor1Op(self, name="processor1") processor2 = Processor2Op(self, name="processor2") processor3 = Processor3Op(self, name="processor3") notifier = NotifierOp(self, name="notifier") writer = WriterOp(self, name="writer") # Define the workflow self.add_flow(reader1, processor1, {("image", "image1"), ("image", "image2"), ("metadata", "metadata")}) self.add_flow(reader2, processor2, {("roi", "roi")}) self.add_flow(processor1, processor2, {("image", "image")}) self.add_flow(processor1, writer, {("image", "image")}) self.add_flow(processor2, notifier) self.add_flow(processor2, processor3) self.add_flow(processor3, writer, {("seg_image", "seg_image")})

If there is a cycle in the graph with no implicit root operator, the root operator is either the first operator in the first call to add_flow method (C++/Python), or the operator in the first call to add_operator method (C++/Python).

Copy
Copied!
            

auto op1 = make_operator<...>("op1"); auto op2 = make_operator<...>("op2"); auto op3 = make_operator<...>("op3"); add_flow(op1, op2); add_flow(op2, op3); add_flow(op3, op1); // There is no implicit root operator // op1 is the root operator because op1 is the first operator in the first call to add_flow

If there is a cycle in the graph with an implicit root operator which has no input port, then the initialization and execution orders of the operators are still topologically sorted as far as possible until the cycle needs to be explicitly broken. An example is given below:

Cycle_Implicit_Root.png

Dynamic Flow Control for Complex Workflows

As of Holoscan v3.0, the dynamic flow control feature is available, enabling operators to modify their connections with other operators at runtime. This allows for the creation of complex workflows with conditional branching, loops, and dynamic routing patterns.

Key features include:

  • Implicit input/output execution ports for execution dependency control

  • The Start operator concept (start_op() (C++/Python)) for managing workflow entry points

  • Dynamic flow modification using set_dynamic_flows() (C++/Python) and add_dynamic_flow() (C++/Python) methods

  • Flow information management via the FlowInfo (C++/Python) class

For details, please refer to the Dynamic Flow Control section of the user guide.

Application Execution Control APIs

Holoscan provides APIs for controlling the execution of operators at the application or fragment level.

stop_execution

The stop_execution() (C++/Python) method allows an application to stop the execution of a specific operator or the entire application:

Copy
Copied!
            

virtual void stop_execution(const std::string& op_name = "");

When called with an operator name, this method stops the execution of the specified operator. When called with an empty string (the default), it stops all operators in the fragment, effectively shutting down the application.

Example usage to stop a specific operator:

Copy
Copied!
            

// From within a Fragment/Application method stop_execution("source_operator");

Example usage to stop the entire application:

Copy
Copied!
            

// From within a Fragment/Application method stop_execution();

Example usage to access stop_execution() method from within an operator:

Copy
Copied!
            

// From within an operator's compute method fragment()->stop_execution(); // `fragment()` returns a pointer to the fragment object

Copy
Copied!
            

def stop_execution(self, op_name="")

When called with an operator name, this method stops the execution of the specified operator. When called with an empty string (the default), it stops all operators in the fragment, effectively shutting down the application.

Example usage to stop a specific operator:

Copy
Copied!
            

# From within a Fragment/Application method self.stop_execution("source_operator")

Example usage to stop the entire application:

Copy
Copied!
            

# From within a Fragment/Application method self.stop_execution()

Example usage to access stop_execution() method from within an operator:

Copy
Copied!
            

# From within an operator's compute method self.fragment.stop_execution() # `self.fragment` is the fragment object

For a complete example of how to use these methods to implement advanced monitoring behavior, see the operator_status_tracking example, which demonstrates:

  1. A source operator that runs for a limited number of iterations

  2. A monitor operator that independently tracks the status of other operators

  3. Automatic application shutdown when all processing operators have completed

You can build your C++ application using CMake, by calling find_package(holoscan) in your CMakeLists.txt to load the SDK libraries. Your executable will need to link against:

  • holoscan::core

  • any operator defined outside your main.cpp which you wish to use in your app workflow, such as:

    • SDK built-in operators under the holoscan::ops namespace.

    • operators created separately in your project with add_library.

    • operators imported externally using with find_library or find_package.

Listing 1 /CMakeLists.txt

Copy
Copied!
            

# Your CMake project cmake_minimum_required(VERSION 3.20) project(my_project CXX) # Finds the holoscan SDK find_package(holoscan REQUIRED CONFIG PATHS "/opt/nvidia/holoscan") # Create an executable for your application add_executable(my_app main.cpp) # Link your application against holoscan::core and any existing operators you'd like to use target_link_libraries(my_app PRIVATE holoscan::core holoscan::ops::<some_built_in_operator_target> <some_other_operator_target> <...> )


Tip

This is also illustrated in all the examples:

  • in CMakeLists.txt for the SDK installation directory - /opt/nvidia/holoscan/examples.

  • in CMakeLists.min.txt for the SDK source directory.

Once your CMakeLists.txt is ready in <src_dir>, you can build in <build_dir> with the command line below. You can optionally pass Holoscan_ROOT if the SDK installation you’d like to use differs from the PATHS given to find_package(holoscan) above.

Copy
Copied!
            

# Configure cmake -S <src_dir> -B <build_dir> -D Holoscan_ROOT="/opt/nvidia/holoscan" # Build cmake --build <build_dir> -j

You can then run your application by running <build_dir>/my_app.

Python applications do not require building. Simply ensure that:

  • The holoscan python module is installed in your dist-packages or is listed under the PYTHONPATH env variable so you can import holoscan.core and any built-in operator you might need in holoscan.operators.

  • Any external operators are available in modules in your dist-packages or contained in PYTHONPATH.

Note

While Python applications do not need to be built, they might depend on operators that wrap C++ operators. All Python operators built-in in the SDK already ship with the Python bindings pre-built. Follow this section if you are wrapping C++ operators yourself to use in your Python application.

You can then run your application by running python3 my_app.py.

Note

Given a CMake project, a pre-built executable, or a Python application, you can also use the Holoscan CLI to package and run your Holoscan application in a OCI-compliant container image.

As of Holoscan v2.3 (for C++) or v2.4 (for Python) it is possible to send metadata alongside the data emitted from an operator’s output ports. This metadata can then be used and/or modified by any downstream operators. The subsections below describe how this feature can be used.

Enabling application metadata

As of Holoscan v3.0, the metadata feature is enabled by default (in older releases it had to be explicitly enabled). If the application author does not wish to use the metadata feature it will not hurt to leave the feature enabled. To avoid even the minor overhead of checking for metadata in received messages, the feature can be explicitly disabled as shown below.

Copy
Copied!
            

app = holoscan::make_application<MyApplication>(); // Disable metadata feature before calling app->run() or app->run_async() app->enable_metadata(false); app->run();

Copy
Copied!
            

app = MyApplication() # Disable metadata feature before calling app.run() or app.run_async() app.enable_metadata(False) app.run()

None of the built-in operators provided by the SDK itself currently require that the feature be enabled, but it is possible that some third-party operators might require it in order to work as expected. An example is the V4L2FormatTranslateOp defined as part of the v4l2_camera example (video format information is stored in the metadata).

Note that the enable_metadata method exists on the Application, Fragment and Operator classes. Calling this method on the application sets the default for all fragments of a distributed application. Calling the method on an individual fragment sets the default to be used for that fragment (overrides the application-level default). Similarly, calling the method on an individual operator overrides the setting for that specific operator within a fragment.

Understanding Metadata Flow

Each operator in the workflow has an associated MetadataDictionary object. At the start of each operator’s compute() call this metadata dictionary will be empty (i.e. metadata does not persist from previous compute calls). When any call to receive() data is made, any metadata also found in the input message will be merged into the operator’s local metadata dictionary. The operator’s compute method can then read, append to or remove metadata as explained in the next section. Whenever the operator emits data via a call to emit() the current status of the operator’s metadata dictionary will be transmitted on that port alonside the data passed via the first argument to the emit call. Any downstream operators will then receive this metadata via their input ports.

Working With Metadata from Operator::compute

Within the operator’s compute() method, the metadata() method can be called to get a shared pointer to the MetadataDictionary of the operator. The metadata dictionary provides a similar API to a std::unordered_map (C++) or dict (Python) where the keys are strings (std::string for C++) and the values can store any object type (via a C++ MetadataObject holding a std::any).

Templated get() and set() method are provided as demonstrated below to allow directly setting values of a given type without having to explicitly work with the internal MetadataObject type.

Copy
Copied!
            

// Receiving from a port updates operator metadata with any metadata found on the port auto input_tensors = op_input.receive<TensorMap>("in"); // Get a reference to the shared metadata dictionary auto meta = metadata(); // Retrieve existing values. // Use get<Type> to automatically cast the `std::any` contained within the `holsocan::Message` auto name = meta->get<std::string>("patient_name"); auto age = meta->get<int>("age"); // Get also provides a two-argument version where a default value to be assigned is given by // the second argument. The type of the default value should match the expected type of the value. auto flag = meta->get("flag", false); // Add a new value (if a key already exists, the value will be updated according to the // operator's metadata_policy). std::vector<float> spacing{1.0, 1.0, 3.0}; meta->set("pixel_spacing"s, spacing); // Remove an item meta->erase("patient_name"); // Check if a key exists bool has_patient_name = meta->has_key("patient_name"); // Get a vector<std::string> of all keys in the metadata const auto& keys = meta->keys(); // ... Some processing to produce output `data` could go here ... // Current state of `meta` will automatically be emitted along with `data` in the call below op_output.emit(data, "output1"); // Can clear all items meta->clear(); // Any emit call after this point would not transmit a metadata object op_output.emit(data, "output2");

See the MetadataDictionary API docs for all available methods.

A Pythonic interface is provided for the MetadataObject type.

Copy
Copied!
            

# Receiving from a port updates operator metadata with any metadata found on the port input_tensors = op_input.receive("in") # self.metadata can be used to access the shared MetadataDictionary # for example we can check if a key exists has_key = "my_key" in self.metadata # get the number of keys num_keys = len(self.metadata) # get a list of the keys print(f"metadata keys ={self.metadata.keys()}") # iterate over the values in the dictionary using the `items()` method for key, value in self.metadata.items(): # process item pass # print a Python dict of the keys/values print(self.metadata) # Retrieve existing values. If the underlying value is a C++ class, a conversion to an equivalent Python object will be made (e.g. `std::vector<std::string>` to `List[str]`). name = self.metadata["patient_name"] age = self.metadata["age"] # It is also supported to use the get method along with an optional default value to use # if the key is not present. flag = self.metadata.get("flag", False) # print the current metadata policy print(f"metadata policy ={self.metadata_policy}") # Add a new value (if a key already exists, the value will be updated according to the # operator's metadata_policy). If the value is set via the indexing operator as below, # the Python object itself is stored as the value. spacing = (1.0, 1.0, 3.0) self.metadata["pixel_spacing"] = spacing # In some cases, if sending metadata to downstream C++-based operators, it may be desired # to instead store the metadata value as an equivalent C++ type. In that case, it is # necessary to instead set the value using the `set` method with `cast_to_cpp=True`. # Automatic casting is supported for bool, str, and various numeric and iterator or # sequence types. # The following would result in the spacing `Tuple[float]` being stored as a # C++ `std::vector<double>`. Here we show use of the `pop` method to remove a previous value # if present. self.metadata.pop("pixel_spacing", None) self.metadata.set("pixel_spacing", spacing, cast_to_cpp=True) # To store floating point elements at a different than the default (double) precision or # integers at a different precision than int64_t, use the dtype argument and pass a # numpy.dtype argument corresponding to the desired C++ type. For example, the following # would instead store `spacing` as a std::vector<float> instead. In this case we show # use of Python's `del` instead of the pop method to remove an existing item. del self.metadata["pixel_spacing"] self.metadata.set("pixel_spacing", spacing, dtype=np.float32, cast_to_cpp=True) # Remove a value del self["patient name"] # ... Some processing to produce output `data` could go here ... # Current state of `meta` will automatically be emitted along with `data` in the call below op_output.emit(data, "output1") # Can clear all items self.metadata.clear() # Any emit call after this point would not transmit a metadata object op_output.emit(data, "output2")

See the MetadataDictionary API docs for all available methods.

The above code illustrated various ways of working with and updating an operator’s metadata.

Note

Pay particular attention to the details of how metadata is set. When working with pure Python applications it is best to just use self.metadata[key] = value or self.metadata.set(key, value) to pass Python objects as the value. This will just use a shared object and not result in copies to/from corresponding C++ types. However, when interacting with other operators that wrap a C++ implementation, their compute method would expected C++ metadata. In that case, the set method with cast_to_cpp=True is needed to cast to the expected C++ type. This was shown in some of the “pixel_spacing” set calls in the example above. For convenience, the value passed to the set method can also be a NumPy array, but note that in this case, a copy into a new C++ std::vector is performed. The dtype of the array will be respected when creating the vector. In general, the types that can currently be cast to C++ are scalar numeric values, strings and Python Iterators or Sequences of these (the sequence will be converted to a 1D or 2D C++ std::vector so the items in the Python sequence cannot be of mixed type).

Metadata Update Policies

The operator class also has a metadata_policy() method that can be used to set a MetadataPolicy to use when handling duplicate metadata keys across multiple input ports of the operator. The available options are:

  • “update” (MetadataPolicy::kUpdate): replace any existing key from a prior receive call with one present in a subsequent receive call.

  • “reject” (MetadataPolicy::kReject): Reject the new key/value pair when a key already exists due to a prior receive call.

  • “raise” (MetadataPolicy::kRaise): Throw a std::runtime_error if a duplicate key is encountered. This is the default policy.

The metadata policy would typically be set during compose() as in the following example:

Copy
Copied!
            

// Example for setting metadata policy from Application::compose() my_op = make_operator<MyOperator>("my_op"); my_op->metadata_policy(holoscan::MetadataPolicy::kRaise);

The operator class also has a metadata_policy() property that can be used to set a MetadataPolicy to use when handling duplicate metadata keys across multiple input ports of the operator. The available options are:

  • “update” (MetadataPolicy.UPDATE): replace any existing key from a prior receive call with one present in a subsequent receive call. This is the default policy.

  • “reject” (MetadataPolicy.REJECT): Reject the new key/value pair when a key already exists due to a prior receive call.

  • “raise” (MetadataPolicy.RAISE): Throw an exception if a duplicate key is encountered.

The metadata policy would typically be set during compose() as in the following example:

Copy
Copied!
            

# Example for setting metadata policy from Application.compose() my_op = MyOperator(self, name="my_op") my_op.metadata_policy = holoscan.core.MetadataPolicy.RAISE

The policy applied as in the example above only applies to the operator on which it was set. The default metadata policy can also be set for the application as a whole via Application::metadata_policy (C++/Python) or for individual fragments of a distributed application via Fragment::metadata_policy (C++/Python).

Use of Metadata in Distributed Applications

Sending metadata between two fragments of a distributed application is supported, but there are a couple of aspects to be aware of.

  1. Sending metadata over the network requires serialization and deserialization of the metadata keys and values. The value types supported for this are the same as for data emitted over output ports (see the table in the section on object serialization). The only exception is that Tensor and TensorMap values cannot be sent as metadata values between fragments (this restriction also applies to tensor-like Python objects). Any custom codecs registered for the SDK will automatically also be available for serialization of metadata values.

  2. There is a practical size limit of several kilobytes in the amount of metadata that can be transmitted between fragments. This is because metadata is currently sent along with other entity header information in the UCX header, which has fixed size limit (the metadata is stored along with other header information within the size limit defined by the HOLOSCAN_UCX_SERIALIZATION_BUFFER_SIZE environment variable).

The above restrictions only apply to metadata sent between fragments. Within a fragment there is no size limit on metadata (aside from system memory limits) and no serialization or deserialization step is needed.

Current limitations

  1. The current metadata API is only fully supported for native holoscan Operators and is not currently supported by operators that wrap a GXF codelet (i.e. inheriting from GXFOperator or created via GXFCodeletOp). Aside from GXFCodeletOp, the built-in operators provided under the holoscan::ops namespace are all native operators, so the feature will work with these. Currently none of these built-in opereators add their own metadata, but any metadata received on input ports will automatically be passed on to their output ports (as long as app->enable_metadata(false) was not set to disable the metadata feature).

Please see the dedicated Holoscan CUDA stream handling page for details on how Holoscan applications using non-default CUDA streams can be written.

Previous Custom Cuda Kernel Samples
Next Creating a Distributed Application
© Copyright 2022-2025, NVIDIA. Last updated on Oct 13, 2025.