Skip to main content
Version: Next

Raw Messages

Raw Messages mode lets ReductROS decode ROS messages that are stored as individual records rather than as MCAP or rosbag archives. It is intended for pipelines that store ROS 1 schema metadata in the $ros attachment of each entry.

License Information

This feature is available in ReductStore Pro under a commercial license. For testing, you can either use a free demo server or request a demo license for your own deployment.

Supported input

Raw Messages expects:

  • one ROS 1 message per record
  • one or more queried entries
  • a $ros attachment on each queried entry that provides the schema metadata

Unlike the MCAP and rosbag archive modes, Raw Messages works directly with individual records. It supports extraction to JSON and export to MCAP.

Required $ros attachment

Raw Messages uses an attachment named $ros on each queried entry. The attachment must contain the following JSON fields:

FieldTypeDescription
encodingstringMust be ros1.
schemastringFull ROS 1 message definition used to decode the binary payload.
topicstringTopic name associated with the stored records.
schema_namestringROS 1 message type, for example std_msgs/String.

If the records were ingested with reduct-bridge, this attachment is written automatically.

Example $ros metadata

{
"encoding": "ros1",
"schema": "string data",
"topic": "/chatter",
"schema_name": "std_msgs/String"
}

Query format

{
"#ext": {
"ros": {
"extract": {
"topic": "string", # Optional ROS topic filter when querying multiple entries
"encode": "object", # e.g., {"data": "jpeg"} for JPEG encoding
"as_label": "object" # e.g., {"label_name": "path_to_json"}
},

"export": {
"format": "string", # Currently only "mcap"
"duration": "string", # e.g., "1m" (max episode duration)
"size": "string" # e.g., "100MB" (max content length)
}
}
}
}
info

Specify exactly one of extract or export. Both operations resolve schema and topic information from the $ros attachment of each queried entry.

Data extraction

The extract property tells ReductROS how to decode each raw ROS 1 record and how to shape the resulting JSON output.

ParameterTypeMandatoryDescription
topicstringNoFilter decoded records by ROS topic from the $ros attachment. Useful when querying multiple entries.
encodeobjectNoA map of binary fields to conversion formats.
encode.<field>stringNoConversion format for the field. Supported values are base64 and jpeg.
as_labelobjectNoComputed labels extracted from the decoded JSON payload. Use JSON paths without the $. prefix.

If topic is omitted, Raw Messages extracts all queried entries that have a valid $ros attachment.

Encoding binary fields

The encode property controls how byte arrays are represented in the JSON output.

FormatDescription
base64Encodes the selected byte array as a base64 string.
jpegEncodes image bytes as a base64-encoded JPEG.
info

Currently, JPEG conversion is supported only for sensor_msgs/Image messages with encoding set to rgb8, bgr8, or mono8.

Timestamps

The extractor uses header.stamp.sec and header.stamp.nsec when they are present in the decoded message. If the message has no ROS header, the output record keeps the original ReductStore record timestamp.

Data export

The export property converts raw ROS 1 records into MCAP episodes. It can aggregate data from multiple ReductStore entries into a single MCAP output stream, using the topic and schema from the $ros attachment of each entry.

ParameterTypeMandatoryDescription
formatstringYesOutput format. Currently only mcap is supported.
durationstringNoMaximum episode duration, for example 30s, 5m, or 2h.
sizestringNoMaximum uncompressed episode size, for example 100MB or 1GB. Minimum supported size is 1KB.

When export is provided, the extension:

  1. Reads raw ROS 1 records from the queried entries.
  2. Resolves schema and topic metadata for each entry from its $ros attachment.
  3. Streams all matching messages into a new MCAP file.
  4. Starts a new MCAP episode whenever duration or size is reached.

Output behavior

  • One output record is produced per input raw ROS record during extraction.
  • Extraction preserves the input entry name and emits application/json.
  • Export emits application/mcap records.
  • Original input labels are preserved.
  • Existing computed labels are preserved during extraction.
  • Extraction adds the following computed labels:
    • encoding
    • schema
    • topic
    • labels defined via as_label

Examples

The following examples use Raw ROS sample data stored as application/ros1 records. Although the examples are written in Python, the same query structure works with any official SDK.

Extracting messages as JSON

This example writes raw geometry_msgs/Point records, stores the schema metadata in the $ros attachment, and then extracts them as JSON. It also creates a computed label from the decoded x field.

import json
from pathlib import Path
from time import time_ns

from reduct import Client

HERE = Path(__file__).parent
DATA = HERE / '../data'


async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry = f'sensor-pos-{time_ns()}'

with open(DATA / 'raw_ros_point_ros_attachment.json', 'r', encoding='utf-8') as f:
ros_attachment = json.load(f)

with open(DATA / 'raw_ros_point_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)

await bucket.write_attachments(entry, {'$ros': ros_attachment})

for sample in records:
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)

condition = {
'#ext': {
'ros': {
'extract': {
'as_label': {
'coord_x': 'x',
},
},
},
'when': {
'@coord_x': {'$eq': 2.0},
},
}
}

async for record in bucket.query(entry, start=records[0]['timestamp'], when=condition):
print(f'Record timestamp: {record.timestamp}')
print(f"Record labels: {dict(sorted(record.labels.items()))}")

payload = await record.read_all()
message = json.loads(payload.decode('utf-8'))
print(json.dumps(message, sort_keys=True, separators=(',', ':')))


if __name__ == '__main__':
import asyncio

asyncio.run(main())

Expected output

Record timestamp: 1773324549469334
Record labels: {'@coord_x': '2.0', '@encoding': 'ros1', '@schema': 'geometry_msgs/Point', '@topic': '/sensor/pos', 'x': '2.0'}
{"x":2.0,"y":2.1,"z":2.2}

Explanation

  • The example uploads three raw ROS 1 records to a new entry.
  • It writes the matching $ros attachment before querying the entry.
  • The extractor decodes the binary payload into a JSON geometry_msgs/Point message.
  • The as_label option exposes the decoded x field as @coord_x.
  • The when filter returns only the record where @coord_x equals 2.0.

Extracting image messages with JPEG encoding

This example writes a raw sensor_msgs/Image record and returns the data field as a JPEG-encoded string inside the JSON response.

import json
from pathlib import Path
from time import time_ns

from reduct import Client

HERE = Path(__file__).parent
DATA = HERE / '../data'


async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry = f'camera-raw-{time_ns()}'

with open(DATA / 'raw_ros_image_ros_attachment.json', 'r', encoding='utf-8') as f:
ros_attachment = json.load(f)

with open(DATA / 'raw_ros_image_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)

sample = records[0]
payload = (DATA / sample['file']).read_bytes()

await bucket.write_attachments(entry, {'$ros': ros_attachment})
await bucket.write(
entry,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)

condition = {
'#ext': {
'ros': {
'extract': {
'encode': {
'data': 'jpeg',
},
},
},
}
}

async for record in bucket.query(entry, start=sample['timestamp'], when=condition):
print(f'Record timestamp: {record.timestamp}')
print(f"Record labels: {dict(sorted(record.labels.items()))}")

message = json.loads((await record.read_all()).decode('utf-8'))
image_info = {k: v for k, v in message.items() if k != 'data'}
print(f'Image parameters: {json.dumps(image_info, sort_keys=True)}')


if __name__ == '__main__':
import asyncio

asyncio.run(main())

Expected output

Record timestamp: 1773324549067620
Record labels: {'@encoding': 'ros1', '@schema': 'sensor_msgs/Image', '@topic': '/camra/raw'}
Image parameters: {"encoding": "rgb8", "header": {"frame_id": "", "seq": 1, "stamp": {"nsec": 67620754, "sec": 1773324549}}, "height": 1, "is_bigendian": 0, "step": 9, "width": 3}

Explanation

  • The example uploads a raw ROS 1 image record and writes the matching $ros attachment.
  • The encode.data = jpeg option converts the binary image buffer into a JPEG string.
  • The script prints only the decoded image metadata to keep the output readable.
  • The output timestamp comes from the ROS header stamp rather than the original record timestamp.

Exporting multiple raw entries as MCAP

This example writes raw ROS 1 records into two entries and exports both entries as a single MCAP episode.

import json
from pathlib import Path
from time import time_ns

from reduct import Client

HERE = Path(__file__).parent
DATA = HERE / '../data'


async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry_a = f'sensor-pos-{time_ns()}'
entry_b = f'sensor-pos-copy-{time_ns()}'

with open(DATA / 'raw_ros_point_ros_attachment.json', 'r', encoding='utf-8') as f:
base_attachment = json.load(f)

attachment_a = dict(base_attachment)
attachment_b = dict(base_attachment)
attachment_b['topic'] = '/sensor/pos_copy'

with open(DATA / 'raw_ros_point_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)

await bucket.write_attachments(entry_a, {'$ros': attachment_a})
await bucket.write_attachments(entry_b, {'$ros': attachment_b})

for sample in records[:2]:
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry_a,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)

sample = records[2]
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry_b,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)

condition = {
'#ext': {
'ros': {
'export': {
'format': 'mcap',
'duration': '1m',
},
},
}
}

async for record in bucket.query('*', start=records[0]['timestamp'], when=condition):
print(f'Record entry: {record.entry}')
print(f'Record timestamp: {record.timestamp}')

data = await record.read_all()
print(f'Episode file size: {len(data)} bytes')


if __name__ == '__main__':
import asyncio

asyncio.run(main())

Explanation

  • The example writes raw ROS 1 point messages into two ReductStore entries.
  • Each entry gets its own $ros attachment with the topic and schema metadata.
  • export.format = mcap tells ReductROS to emit MCAP instead of JSON.
  • Because the query targets *, both entries are aggregated into the same exported MCAP episode.
  • duration = 1m enables splitting if the exported stream grows beyond one minute of source data.