Robot Direct API

High-frequency Python API for robot control loops — read tag positions and push live overlays directly over gRPC, bypassing the MCP layer.

AprilCam Robot Direct API

This guide is for robot programs that need high-frequency access to tag positions and live overlay drawing. If you are an AI agent working interactively, use the MCP tools instead. If you are writing a control loop that runs at 5-50 Hz, use the Python API described here — it skips the MCP layer entirely and talks directly to the daemon over gRPC.


Quick Start

from aprilcam.config import Config
from aprilcam.client.control import DaemonControl

config = Config.load()
dc = DaemonControl.connect_default(config)   # auto-spawns daemon if needed

# One-shot tag read
cam = dc.list_cameras()[0]
tags = dc.get_tags(cam)
for tag in tags.tags:
    print(tag.id, tag.world_xy)   # world_xy is (x, y) in cm, or None

dc.close()

DaemonControl — Full API

DaemonControl wraps the gRPC channel. Create it once; reuse it across your control loop. It is thread-safe.

Connection

from aprilcam.config import Config
from aprilcam.client.control import DaemonControl

# Auto-connect (recommended): spawns daemon if not running, probes gRPC,
# returns a ready-to-use instance.
dc = DaemonControl.connect_default(Config.load())

# Manual connect (explicit Unix socket or TCP port):
dc = DaemonControl(unix_path="/tmp/aprilcam/control.sock")
dc.connect()

# As a context manager:
with DaemonControl.connect_default(Config.load()) as dc:
    ...

Camera Management

cameras = dc.list_cameras()          # -> list[str]  e.g. ["arducam-ov9782-usb-camera"]
cam = dc.open_camera(index=4)        # -> str cam_name   (index = OS camera number)
info = dc.get_camera_info(cam)       # -> CameraInfo(cam_name, calibrated, frame_size, fps)
frame = dc.capture_frame(cam)        # -> np.ndarray BGR (JPEG decoded)
dc.reload_calibration(cam)           # reload calibration.json from disk
dc.close_camera(cam)
dc.shutdown()                        # stop the daemon process

One-Shot Tag Query

Best for low-frequency agents. Returns the latest frame’s tags without setting up a stream.

tag_frame = dc.get_tags(cam)
# tag_frame.tags          -> list[TagRecord]
# tag_frame.homography    -> list[list[float]] | None  (3x3 matrix)
# tag_frame.fps           -> float

for tag in tag_frame.tags:
    print(tag.id)                 # int
    print(tag.center_px)          # (float, float) pixel center
    print(tag.world_xy)           # (float, float) in cm, or None if uncalibrated
    print(tag.yaw)                # float, radians
    print(tag.speed_world)        # float, cm/s
    print(tag.in_playfield)       # bool

Tag Stream (high-frequency)

Use the tag stream for control loops. Each read() blocks until the next published frame (rate-limited to max_hz, default 20).

tag_stream = dc.get_tag_stream(cam, max_hz=30)

for msg in tag_stream:
    if isinstance(msg, TagFrame):       # aprilcam.client.models.TagFrame
        for tag in msg.tags:
            x, y = tag.world_xy or (0, 0)
            # ... control logic ...
    # msg may also be an OverlayFrame (if another process published one)
    # — just ignore those in a consumer-only loop.

tag_stream.close()

Image Stream

image_stream = dc.get_image_stream(cam, max_hz=30)

for frame in image_stream:             # frame is np.ndarray BGR
    cv2.imshow("live", frame)
    if cv2.waitKey(1) == ord("q"):
        break

image_stream.close()

Live Overlay — Push Annotations at 5-50 Hz

publish_overlay broadcasts graphical elements to every aprilcam view window that is subscribed to that camera’s tag stream. The view drops any overlay whose TTL has expired — set TTL shorter than your update period to ensure stale data disappears automatically.

Element types

All coordinates are world cm (same space as tag.world_xy). Colors are [R, G, B] each 0-255. Thickness is pixels; -1 = filled.

type params description
"arc" [cx, cy, radius, start_deg, end_deg] Ellipse arc (handles non-square homography)
"arrow" [x1, y1, x2, y2] Arrow from tail to head
"point" [x, y, radius_cm] Circle at world position
"polyline" [x0, y0, x1, y1, …] Open polyline

Robot control-loop pattern

import time
from aprilcam.config import Config
from aprilcam.client.control import DaemonControl

config = Config.load()
dc = DaemonControl.connect_default(config)
cam = dc.list_cameras()[0]

UPDATE_HZ = 10
TTL = 0.3   # slightly longer than one period so the view never sees a gap

try:
    while True:
        tag_frame = dc.get_tags(cam)

        # --- your control logic here ---
        robot_tag = next((t for t in tag_frame.tags if t.id == 7), None)
        if robot_tag and robot_tag.world_xy:
            rx, ry = robot_tag.world_xy
            heading = robot_tag.yaw

            import math
            arrow_len = 12.0
            ax = rx + arrow_len * math.cos(heading)
            ay = ry + arrow_len * math.sin(heading)
            lookahead_r = 25.0

            dc.publish_overlay(cam, [
                # robot body
                {"type": "point",
                 "params": [rx, ry, 5.0],
                 "color": [60, 120, 255], "thickness": -1},
                # heading arrow
                {"type": "arrow",
                 "params": [rx, ry, ax, ay],
                 "color": [240, 240, 240], "thickness": 3},
                # pure-pursuit lookahead arc
                {"type": "arc",
                 "params": [rx, ry, lookahead_r,
                            math.degrees(heading) - 90,
                            math.degrees(heading) + 90],
                 "color": [0, 220, 60], "thickness": 2},
            ], ttl=TTL)

        time.sleep(1.0 / UPDATE_HZ)

finally:
    dc.publish_overlay(cam, [], ttl=0)   # clear immediately on exit
    dc.close()

Clear the overlay immediately

dc.publish_overlay(cam, [], ttl=0)

Persistent Paths — Write to paths.json

Persistent paths (waypoint sequences shown in the live view) are stored in a paths.json file inside the camera’s data directory. Unlike overlays, they persist across daemon restarts and do not expire.

Via the MCP tools (recommended for AI agents): create_path, delete_path, clear_paths, list_paths.

Via direct file write (recommended for robot programs or scripts):

import json, os
from pathlib import Path

PATHS_FILE = Path("data/aprilcam/cameras/arducam-ov9782-usb-camera/paths.json")

def write_paths(paths: list[dict]) -> None:
    tmp = PATHS_FILE.with_suffix(".tmp")
    tmp.write_text(json.dumps(paths))
    os.replace(tmp, PATHS_FILE)    # atomic swap — live view picks it up within ~33ms

# Example: one path with three waypoints
write_paths([{
    "path_id": "path_000",
    "playfield_id": "arducam-ov9782-usb-camera",
    "waypoints": [
        {"x": 30, "y": 20, "size_cm": 4, "symbol": "filled_circle",
         "symbol_color": [0, 200, 80], "line_color": [0, 200, 80]},
        {"x": 90, "y": 20, "size_cm": 4, "symbol": "filled_circle",
         "symbol_color": [0, 200, 80], "line_color": [0, 200, 80]},
        {"x": 60, "y": 65, "size_cm": 4, "symbol": "filled_circle",
         "symbol_color": [0, 200, 80], "line_color": [0, 200, 80]},
    ],
}])

# Remove all paths
write_paths([])

Valid symbol values: square, filled_square, circle, filled_circle, triangle, filled_triangle, x, none.


TagStreamConsumer — Overlay-aware iteration

TagStreamConsumer.read() returns either a TagFrame (Pydantic model) or an OverlayFrame (proto message) from the multiplexed socket. In a consumer-only loop you can ignore OverlayFrame messages:

from aprilcam.client.models import TagFrame
from aprilcam.client.stream import TagStreamConsumer

stream = dc.get_tag_stream(cam)
for msg in stream:
    if isinstance(msg, TagFrame):
        # handle tag data
        pass

Configuration

Config.load() searches for settings in this order (highest priority first):

Source Example
Environment variable APRILCAM_DATA_DIR=/mnt/robot/data
.env file (walk up from CWD) APRILCAM_SOCKET_DIR=/run/aprilcam
.aprilcam dotfile (walk up from CWD) APRILCAM_LOG_LEVEL=DEBUG
~/.aprilcam user dotfile

Key variables:

Variable Default Description
APRILCAM_SOCKET_DIR /tmp/aprilcam/ gRPC control socket and stream sockets
APRILCAM_DATA_DIR ./data/aprilcam/ Camera data, calibration, paths.json
APRILCAM_LOG_LEVEL INFO Daemon log verbosity

Demo Script

tests/demo_overlay.py in the repository walks through all features: triangle path → cross path → static overlay scene → animated robot orbit → cleanup. Run it with a live camera open in aprilcam view.