Accessing the Omniverse Message Bus from Python

In this blog post I explore sending and receiving message bus events using the NVIDIA Omniverse framework in Python. I create two extensions: one to publish messages, and another to receive them. In an Omniverse Cloud project, the message bus will span microservices (using Redis I think read somewhere). In this blog, I use it with two extensions in the same Omniverse Composer instance to talk to each other.

Note: While this blog may appear long, it is mainly background information. The actual code is only a few lines to publish and subscribe to message bus events, and the events are simple JSON objects.

Omniverse Architecture

NVIDIA have been describing Omniverse USD Composer as a “sample app”, not a tool. I am not sure I would go that far, but it is useful to understand the mindset they want developers to have. At the recent GTC conference, Ashley (an NVIDIA Developer Advocate) gave a training course on building a React-based web application that worked with Omniverse running in the cloud. The cloud rendered output was streamed to the React app for display. (For more, see my previous post 3D product rendering in web pages with Omniverse Cloud). The demo React web application sends Omniverse message bus events to an Omniverse Kit application running in Omniverse Cloud.

In the demo, OmniGraph is used to listen to events and process them. I am a bit wary of using OmniGraph because it results in application logic (the OmniGraph nodes) being mixed in the same stage as the content being viewed. It feels cleaner to me to keep application logic out of the stage, where possible. The OmniGraph should be versioned with the application, not the rest of the stage content. At least the OmniGraph to handle message bus events should be kept in a separate USD file to the rest of the stage content so the two can be updated independently. In this blog I take the approach of implementing message bus code directly in Python avoiding the need to manage application logic in the stage content.

At a high level, Omniverse Cloud is a set of services. To get access to USD content, normally you would store the content in an Omniverse Nucleus server.

Digging a bit deeper, microservices are intended to be implemented as Kit apps. Kit is the Omniverse framework for building applications. Omniverse USD Composer is an example of such an app. But you can build your own from any combination of extensions you wish. So to build an application, you list the extensions to include in the application. There is not much more to it than that. (A part of the challenge to build an application appears to be to work out which extensions to include.)

For microservices to intercommunicate, you can do direct RPC style calls (over HTTP) where the caller can wait for a direct response to the request, or send messages on a publisher/subscriber style message bus so anyone who registers interest will get a copy of the event asynchronously.

Message Bus Sender

The following is code for sending message bus events. Things to note:

  • COUNTER_EVENT is a handle to the event type (aka topic).
  • bus holds a reference to the message bus.
  • The bulk of the code is from the default template code – it creates a window with a label showing a counter value and buttons to reset the counter and send an incrementing counter value.
  • To create your own extension, use the green “+” in the top left corner of the “Extensions” window then put the code below into the extension.py file. (There are multiple other files that need to be created as part of an extension.)
  • The important line is bus.push(COUNTER_EVENT, …) which publishes a JSON message over the message bus to any extensions subscribed to the event, in our case a second extension (below).

Here is full working code.

import omni.ext
import omni.ui as ui
import carb.events
import omni.kit.app

COUNTER_EVENT = carb.events.type_from_string("ordinary.messagebus.COUNTER_EVENT")
bus = omni.kit.app.get_app().get_message_bus_event_stream()

class OrdinaryMessagebusSenderExtension(omni.ext.IExt):

def on_startup(self, ext_id):
print("[ordinary.messagebus.sender] ordinary messagebus sender startup")
self._count = 0
self._window = ui.Window("Message Bus Sender", width=300, height=300)
with self._window.frame:
with ui.VStack():
label = ui.Label("Ready to send")
label.alignment = ui.Alignment.CENTER

def on_reset_count():
self._count = 0
label.text = f"count: {self._count}"

def on_send_count():
self._count += 1
label.text = f"count: {self._count}"
bus.push(COUNTER_EVENT, payload={"count": self._count})

with ui.HStack():
ui.Button("Reset", clicked_fn=on_reset_count)
ui.Button("Send", clicked_fn=on_send_count)

def on_shutdown(self):
print("[ordinary.messagebus.sender] ordinary messagebus sender shutdown")

Message Bus Receiver

The second extension is a message bus receiver. It accepts the event with a new counter value and displays it. Things to note:

  • While here the sender and receiver extensions are in the same app, the two extensions should still work in separate microservices. The only communication between the extensions is via messages – there is no shared state via variables or similar.
  • Again, most of the code is from the default extension template.
  • Instead of buttons, bus.create_subscription_to_pop_by_type(COUNTER_EVENT, callback) receives a message bus event and calls the provided callback function, receive count.
  • The subscription is maintained in a class variable. If a local variable is used, the subscription is ended as soon as the variable goes out of scope (meaning messages will not be received).

Here is the full working code:

import omni.ext
import omni.ui as ui
import carb.events
import omni.kit.app

COUNTER_EVENT = carb.events.type_from_string("ordinary.messagebus.COUNTER_EVENT")
bus = omni.kit.app.get_app().get_message_bus_event_stream()

class OrdinaryMessagebusReceiverExtension(omni.ext.IExt):
def on_startup(self, ext_id):
print("[ordinary.messagebus.receiver] ordinary messagebus receiver startup")
self._count = 0
self._window = ui.Window("Message Bus Receiver", width=300, height=300)
with self._window.frame:
with ui.VStack():
label = ui.Label("Ready to receive")
label.alignment = ui.Alignment.CENTER

def on_receive_count(e):
self._count = e.payload['count']
label.text = f"count: {self._count}"

# Must keep the subscription in scope or it will be closed.
self._sub = bus.create_subscription_to_pop_by_type(
COUNTER_EVENT, on_receive_count)

def on_shutdown(self):
print("[ordinary.messagebus.reciver] ordinary messagebus receiver shutdown")

Code in Action

The following shows the code in action.

If the UI is not needed, the above code could be simplified to just a few lines. The core sending code is:

COUNTER_EVENT = carb.events.type_from_string("ordinary.messagebus.COUNTER_EVENT")
bus = omni.kit.app.get_app().get_message_bus_event_stream()

bus.push(COUNTER_EVENT, payload={"count": count})

The core receiving code is:

COUNTER_EVENT = carb.events.type_from_string("ordinary.messagebus.COUNTER_EVENT")
bus = omni.kit.app.get_app().get_message_bus_event_stream()

def on_receive_count(e):
count = e.payload['count']
print(count)

sub = bus.create_subscription_to_pop_by_type(COUNTER_EVENT, on_receive_count)

Conclusion

This was a quick post showing how easy it is to use the Omniverse message bus. This can be done in OmniGraph for no-code lovers, or directly in Python code as above. Once you find the appropriate function calls it was pretty quick and easy to do.

When should you use events over a message bus? A web app talking to a Omniverse Kit app running in the cloud is clearly beneficial, but it also useful in larger applications that may be distributed across multiple microservices. For example, in an application I was thinking about I want a series of “background jobs” such as “convert a line of dialog from a screenplay into an audio file” as well as stage update announcements such as “scene 23 has changed” which might trigger a background re-render of the scene. The user does not want wait for the tasks to complete.

The good news is the message bus portion of the code won’t be the hard part.


Leave a comment