Debug replay resolution (#23287)

* unlink shm frames when camera is removed

* drop stale shm cache refs when cached segment is too small for requested shape

* skip new-object frame cache write when current_frame is unavailable

* add tests

* use setdefault when adding a new camera

Multiple subscribers in the same process each unpickle the ZMQ payload independently and would otherwise write divergent Python objects to the shared cameras dict — leaving long-lived references (e.g. CameraState.camera_config) pointing at a copy that subsequent in-place mutations like apply_section_update can never reach. setdefault collapses everyone onto the first writer's object so attribute mutations propagate to every consumer in this process.

* rebuild ffmpeg commands on detect update

Rebuild the cached ffmpeg cmd so the next process spawn picks up new resolution/fps. Running cameras keep their existing cmd (ffmpeg_cmds is only read at process startup); replay cameras are recycled by CameraMaintainer to pick up the rebuilt cmd

* drop stale shm cache refs when cached segment size doesn't match requested shape

The cached SharedMemoryFrameManager reference can point at a segment whose
size no longer matches the requested shape — the segment was unlinked and
recreated at a different size in a camera add/remove cycle. This catches
both a resolution increase (cached too small) and a decrease (cached too
large, pointing at an orphaned inode whose stale bytes would otherwise be
misinterpreted at the new shape, producing distorted/miscolored YUV frames).

After reopening, if the OS-level segment still doesn't match the requested
shape we're in a transient mid-recreate state — either the maintainer
hasn't allocated the new segment yet (size too small) or we opened a
pre-recycle segment (size too big). Either way, skip the frame and don't
cache the mismatched ref.

* recycle replay camera on detect update

* discard tracked-object state when detect resolution changes mid-session

When detect resolution changes mid-session every tracked object we hold
was localized against the old pixel grid. Their boxes no longer
correspond to anything in the new frame, and the `end` callback that
fires when their IDs disappear from the new detect process's detections
publishes those stale boxes to consumers (LPR, snapshot crop) that slice
the new frame and crash on empty arrays. Drop the tracked-object state
on a shape change so no stale boxes ever cross the CameraState boundary.

Belt-and-suspenders: also drop any incoming batch whose boxes exceed the
current detect resolution. These are in-flight queue entries from the
pre-recycle detect process that beat the new detect process to the
queue; processing them would re-introduce stale-resolution tracked
objects we just dropped above. The per-camera detect process clamps
legitimate boxes to detect.width-1 / detect.height-1, so any coord
beyond that is unambiguously stale.

* rebuild motion and object filter masks on detect resolution change

Apply the detect update first so frame_shape reflects the new resolution
before we rebuild dependents.

Motion's rasterized_mask is sized to frame_shape at construction. When
detect resolution changes we must rebuild RuntimeMotionConfig so the
mask matches the new frame size; otherwise consumers like the LPR
processor and motion detector hit a shape mismatch when they index
frames with the stale mask.

Same story for per-object filter masks — rebuild RuntimeFilterConfig at
the new frame_shape so the merged global+per-object masks they hold
match what they'll be indexed against.

* republish motion and objects on in-memory detect resize

A detect resolution change also invalidates the rasterized masks on
motion and per-object filters. apply_section_update has rebuilt them at
the new frame_shape; publish them too so other processes replace their
old values.

* add test

* frontend

* add refresh topic for camera maintainer recycle action

The maintainer's recycle branch is doing an action (recycle the camera)
in response to a section-level signal. Introduce a
CameraConfigUpdateEnum.refresh case as an explicit action signal — the
maintainer subscribes to refresh instead of detect, parallel with add
and remove. Publishers fire refresh alongside detect when a recycle is
needed; section-level subscribers keep their existing topic.

Since no main-process subscriber listens for detect anymore, the
refresh handler calls recreate_ffmpeg_cmds() explicitly so the shared
CameraConfig's ffmpeg_cmds is rebuilt before the new subprocesses
spawn.

* factor stale-resolution state drop into a CameraState method
This commit is contained in:
Josh Hawkins 2026-05-22 09:39:52 -05:00 committed by GitHub
parent 0bdf5002a0
commit 3a09d01bbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 454 additions and 14 deletions

View File

@ -750,6 +750,33 @@ def _config_set_in_memory(request: Request, body: AppConfigSetBody) -> JSONRespo
settings,
)
# detect resize also republishes motion + objects so other
# processes pick up the rebuilt masks, and fires refresh so
# the camera maintainer recycles the camera process to pick
# up the new ffmpeg cmd / SHM sizing
if field == "detect":
cam_cfg = config.cameras.get(camera)
if cam_cfg is not None:
if cam_cfg.motion is not None:
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(
CameraConfigUpdateEnum.motion, camera
),
cam_cfg.motion,
)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(
CameraConfigUpdateEnum.objects, camera
),
cam_cfg.objects,
)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(
CameraConfigUpdateEnum.refresh, camera
),
cam_cfg,
)
return JSONResponse(
content={"success": True, "message": "Config applied in-memory"},
status_code=200,

View File

@ -14,6 +14,7 @@ from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import REPLAY_CAMERA_PREFIX
from frigate.models import Regions
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
@ -50,6 +51,7 @@ class CameraMaintainer(threading.Thread):
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.refresh,
],
)
self.shm_count = self.__calculate_shm_frame_count()
@ -202,6 +204,25 @@ class CameraMaintainer(threading.Thread):
capture_process.terminate()
capture_process.join()
def __unlink_camera_frame_slots(self, camera: str) -> None:
"""Drop the camera's per-frame YUV SHM segments from this
process's frame_manager and unlink them at the OS level.
Safe to call after the camera's capture/processor subprocesses
have been joined they no longer hold mappings, so unlink frees
the segments immediately. Other long-lived processes that opened
these slots will continue using their existing mappings until
they call frame_manager.get with a shape that no longer fits
(the get path drops and reopens stale refs).
"""
prefix = f"{camera}_frame"
names = [n for n in list(self.frame_manager.shm_store) if n.startswith(prefix)]
for name in names:
try:
self.frame_manager.delete(name)
except Exception as exc:
logger.debug("Could not unlink SHM %s: %s", name, exc)
def __stop_camera_process(self, camera: str) -> None:
camera_process = self.camera_processes.get(camera)
if camera_process is not None:
@ -253,12 +274,45 @@ class CameraMaintainer(threading.Thread):
for camera in updated_cameras:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
self.__unlink_camera_frame_slots(camera)
self.capture_processes.pop(camera, None)
self.camera_processes.pop(camera, None)
self.camera_stop_events.pop(camera, None)
self.region_grids.pop(camera, None)
self.camera_metrics.pop(camera, None)
self.ptz_metrics.pop(camera, None)
elif update_type == CameraConfigUpdateEnum.refresh.name:
# Recycle replay cameras so detect width/height/fps
# propagate through ffmpeg args, SHM sizing, and the
# region grid. Regular cameras detect change still
# requires a full restart.
for camera in updated_cameras:
if not camera.startswith(REPLAY_CAMERA_PREFIX):
continue
new_config = self.update_subscriber.camera_configs.get(camera)
if new_config is None:
# remove arrived in the same batch
continue
if (
camera not in self.camera_processes
and camera not in self.capture_processes
):
continue
# rebuild ffmpeg cmds on the shared config so the
# new subprocesses spawn with current args
new_config.recreate_ffmpeg_cmds()
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
self.__unlink_camera_frame_slots(camera)
self.capture_processes.pop(camera, None)
self.camera_processes.pop(camera, None)
self.__start_camera_processor(camera, new_config, runtime=True)
self.__start_camera_capture(camera, new_config, runtime=True)
# ensure the capture processes are done
for camera in self.capture_processes.keys():

View File

@ -45,6 +45,7 @@ class CameraState:
self.frame_cache: dict[float, dict[str, Any]] = {}
self.zone_objects: defaultdict[str, list[Any]] = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self._last_frame_shape: tuple[int, int] = self.camera_config.frame_shape_yuv
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes: list[tuple[int, int, int, int]] = []
@ -303,6 +304,42 @@ class CameraState:
def on(self, event_type: str, callback: Callable[..., Any]) -> None:
self.callbacks[event_type].append(callback)
def _discard_stale_resolution_state(
self, current_detections: dict[str, dict[str, Any]]
) -> bool:
"""Drop tracked state when the camera's detect resolution has
changed, and signal the caller to skip this batch if it contains
out-of-bounds boxes from the pre-recycle detect process.
Returns True when the batch should be skipped entirely.
"""
# detect resolution changed — drop tracked state so old-grid
# boxes don't leak through end-callbacks
current_shape = self.camera_config.frame_shape_yuv
if current_shape != self._last_frame_shape:
logger.debug(
f"{self.name}: detect resolution changed {self._last_frame_shape} -> {current_shape}, dropping tracked state"
)
with self.current_frame_lock:
self.tracked_objects.clear()
self.motion_boxes = []
self.regions = []
self._last_frame_shape = current_shape
# drop in-flight batches from the pre-recycle detect process
# whose boxes exceed the current detect resolution
detect = self.camera_config.detect
if detect.width is not None and detect.height is not None:
for obj in current_detections.values():
box = obj.get("box")
if box and (box[2] > detect.width or box[3] > detect.height):
logger.debug(
f"{self.name}: dropping stale-resolution detection batch (box {box} exceeds {detect.width}x{detect.height})"
)
return True
return False
def update(
self,
frame_name: str,
@ -311,6 +348,9 @@ class CameraState:
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
) -> None:
if self._discard_stale_resolution_state(current_detections):
return
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
@ -332,14 +372,18 @@ class CameraState:
current_detections[id],
)
# add initial frame to frame cache
logger.debug(
f"{self.name}: New object, adding {frame_time} to frame cache for {id}"
)
self.frame_cache[frame_time] = {
"frame": np.copy(current_frame), # type: ignore[arg-type]
"object_id": id,
}
# Skip caching when the frame buffer isn't readable — e.g.
# frame_manager.get returned None because the SHM segment was
# unlinked or hasn't been recreated yet during a camera
# add/remove cycle.
if current_frame is not None:
logger.debug(
f"{self.name}: New object, adding {frame_time} to frame cache for {id}"
)
self.frame_cache[frame_time] = {
"frame": np.copy(current_frame),
"object_id": id,
}
# save initial thumbnail data and best object
thumbnail_data = {

View File

@ -26,6 +26,7 @@ class CameraConfigUpdateEnum(str, Enum):
object_genai = "object_genai"
onvif = "onvif"
record = "record"
refresh = "refresh" # signals the camera maintainer to recycle the camera process
remove = "remove" # for removing a camera
review = "review"
review_genai = "review_genai"
@ -84,8 +85,8 @@ class CameraConfigUpdateSubscriber:
self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any
) -> None:
if update_type == CameraConfigUpdateEnum.add:
self.config.cameras[camera] = updated_config
self.camera_configs[camera] = updated_config
shared = self.config.cameras.setdefault(camera, updated_config)
self.camera_configs[camera] = shared
return
elif update_type == CameraConfigUpdateEnum.remove:
self.config.cameras.pop(camera, None)

View File

@ -0,0 +1,79 @@
"""Tests for CameraMaintainer SHM cleanup on camera remove.
Regression coverage for the case where a camera is removed and then a
new camera is added with the same name. Without unlinking the per-frame
YUV SHM slots, the maintainer's frame_manager.create call hits
FileExistsError and falls back to reopening the existing segment at the
*old* size, which the new ffmpeg process then writes mismatched-size
frames into.
"""
import unittest
from unittest.mock import MagicMock, patch
from frigate.camera.maintainer import CameraMaintainer
class TestMaintainerUnlinkFrameSlotsOnRemove(unittest.TestCase):
def _make_maintainer(self) -> CameraMaintainer:
"""Build a maintainer without invoking __init__ (avoids needing real
FrigateConfig, queues, multiprocessing manager, etc.). We're only
exercising the SHM-cleanup helper, so the surrounding init is
irrelevant."""
maintainer = CameraMaintainer.__new__(CameraMaintainer)
maintainer.frame_manager = MagicMock()
return maintainer
def test_unlinks_only_segments_with_matching_prefix(self) -> None:
maintainer = self._make_maintainer()
maintainer.frame_manager.shm_store = {
"front_frame0": object(),
"front_frame1": object(),
"front_frame2": object(),
# Different camera; must not be touched.
"side_frame0": object(),
# Detector input/output buffers are sized by the model and
# cached by the long-lived DetectorRunner — must not be
# touched even when their owning camera is removed.
"front": object(),
"out-front": object(),
}
# __name-mangled access from outside the class.
maintainer._CameraMaintainer__unlink_camera_frame_slots("front")
deleted = [c.args[0] for c in maintainer.frame_manager.delete.call_args_list]
self.assertEqual(
sorted(deleted),
["front_frame0", "front_frame1", "front_frame2"],
)
def test_handles_camera_with_no_slots(self) -> None:
"""Cameras that were removed before any frame slot was ever
created (e.g. cancelled during preparing_clip) should be a no-op."""
maintainer = self._make_maintainer()
maintainer.frame_manager.shm_store = {"other_frame0": object()}
maintainer._CameraMaintainer__unlink_camera_frame_slots("front")
maintainer.frame_manager.delete.assert_not_called()
def test_swallows_delete_errors(self) -> None:
"""Unlink failures shouldn't abort the remove loop — best-effort."""
maintainer = self._make_maintainer()
maintainer.frame_manager.shm_store = {
"front_frame0": object(),
"front_frame1": object(),
}
maintainer.frame_manager.delete.side_effect = OSError("simulated")
# Both slots are attempted; the OSError on the first doesn't
# prevent the second from being tried.
with patch("frigate.camera.maintainer.logger"):
maintainer._CameraMaintainer__unlink_camera_frame_slots("front")
self.assertEqual(maintainer.frame_manager.delete.call_count, 2)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,156 @@
"""Tests for SharedMemoryFrameManager cache invalidation.
Covers the case where a SHM segment is unlinked and recreated at a
different size across a camera add/remove cycle while a long-lived
in-process cache (e.g. TrackedObjectProcessor) still holds a ref to
the old, smaller segment.
"""
import unittest
from types import SimpleNamespace
from unittest.mock import patch
import numpy as np
from frigate.util.image import SharedMemoryFrameManager
def _fake_shm(size: int) -> SimpleNamespace:
"""A minimal stand-in for UntrackedSharedMemory with .size and .buf."""
return SimpleNamespace(size=size, buf=bytearray(size), close=lambda: None)
class TestSharedMemoryFrameManagerGet(unittest.TestCase):
def test_get_reopens_when_cached_segment_is_smaller_than_shape(self) -> None:
"""A cached ref to an older smaller segment must be dropped and the
current (correctly sized) segment reopened. Without this, np.ndarray
would raise "buffer is too small for requested array" when the
in-memory cache pointed at an old SHM after a same-name resize."""
manager = SharedMemoryFrameManager()
small = _fake_shm(size=100)
current = _fake_shm(size=2_500)
manager.shm_store["cam_frame0"] = small
with patch("frigate.util.image.UntrackedSharedMemory", return_value=current):
arr = manager.get("cam_frame0", (50, 50))
self.assertIsNotNone(arr)
self.assertEqual(arr.shape, (50, 50))
self.assertIs(manager.shm_store["cam_frame0"], current)
def test_get_reopens_when_cached_segment_is_larger_than_shape(self) -> None:
"""Symmetric to the smaller-cache case: when detect resolution drops,
the SHM is unlinked and recreated at a smaller size. A cached ref to
the old, larger segment still satisfies any size check but points at
an orphaned inode whose stale bytes get reinterpreted at the new
shape producing miscolored, distorted YUV frames downstream. Drop
the cache so we reopen by name and bind to the current segment."""
manager = SharedMemoryFrameManager()
old_large = _fake_shm(size=10_000)
current = _fake_shm(size=2_500)
manager.shm_store["cam_frame0"] = old_large
with patch("frigate.util.image.UntrackedSharedMemory", return_value=current):
arr = manager.get("cam_frame0", (50, 50))
self.assertIsNotNone(arr)
self.assertEqual(arr.shape, (50, 50))
self.assertIs(manager.shm_store["cam_frame0"], current)
def test_get_keeps_cached_segment_when_size_matches(self) -> None:
"""Don't pay the reopen cost when the cached ref is the right size."""
manager = SharedMemoryFrameManager()
cached = _fake_shm(size=2_500)
manager.shm_store["cam_frame0"] = cached
with patch("frigate.util.image.UntrackedSharedMemory") as untracked_shm_cls:
arr = manager.get("cam_frame0", (50, 50))
untracked_shm_cls.assert_not_called()
self.assertIsNotNone(arr)
self.assertIs(manager.shm_store["cam_frame0"], cached)
def test_get_opens_fresh_when_no_cache_entry(self) -> None:
manager = SharedMemoryFrameManager()
fresh = _fake_shm(size=2_500)
with patch("frigate.util.image.UntrackedSharedMemory", return_value=fresh):
arr = manager.get("cam_frame0", (50, 50))
self.assertIsNotNone(arr)
self.assertIs(manager.shm_store["cam_frame0"], fresh)
def test_get_returns_none_when_segment_missing(self) -> None:
manager = SharedMemoryFrameManager()
with patch(
"frigate.util.image.UntrackedSharedMemory",
side_effect=FileNotFoundError,
):
arr = manager.get("cam_frame0", (50, 50))
self.assertIsNone(arr)
def test_get_returns_none_when_reopened_segment_is_still_too_small(self) -> None:
"""Race during a same-name SHM recreate: cache is stale, we reopen
by name, but the maintainer hasn't allocated the new segment yet —
the reopened ref is also too small. Skip the frame (return None)
rather than crash on np.ndarray."""
manager = SharedMemoryFrameManager()
small_cached = _fake_shm(size=100)
still_small_after_reopen = _fake_shm(size=100)
manager.shm_store["cam_frame0"] = small_cached
with patch(
"frigate.util.image.UntrackedSharedMemory",
return_value=still_small_after_reopen,
):
arr = manager.get("cam_frame0", (50, 50))
self.assertIsNone(arr)
# Don't cache the too-small reopened ref — next call will re-open
# once the maintainer has finished recreating the segment.
self.assertNotIn("cam_frame0", manager.shm_store)
def test_get_handles_n_dimensional_shape(self) -> None:
"""np.prod must be used (not raw multiplication) for tuple shapes."""
manager = SharedMemoryFrameManager()
# YUV-shaped frame: (height * 3/2, width) for 1920x1080 = 3,110,400
big_enough = _fake_shm(size=3_110_400)
manager.shm_store["cam_frame0"] = big_enough
with patch("frigate.util.image.UntrackedSharedMemory") as untracked_shm_cls:
arr = manager.get("cam_frame0", (1620, 1920))
untracked_shm_cls.assert_not_called()
self.assertIsNotNone(arr)
self.assertEqual(arr.shape, (1620, 1920))
class TestSharedMemoryFrameManagerGetRecreatesLargerSegment(unittest.TestCase):
"""End-to-end-style: simulates the full unlink-and-recreate cycle."""
def test_segment_grows_then_get_succeeds(self) -> None:
manager = SharedMemoryFrameManager()
# Phase 1: existing camera at 320x240 YUV — 320 * 240 * 1.5 = 115_200
small = _fake_shm(size=115_200)
manager.shm_store["cam_frame0"] = small
arr_small = np.ndarray((360, 320), dtype=np.uint8, buffer=small.buf)
self.assertEqual(arr_small.shape, (360, 320))
# Phase 2: restart at 1920x1080 — new SHM segment, larger size.
large = _fake_shm(size=3_110_400)
with patch("frigate.util.image.UntrackedSharedMemory", return_value=large):
arr_large = manager.get("cam_frame0", (1620, 1920))
self.assertIsNotNone(arr_large)
self.assertEqual(arr_large.shape, (1620, 1920))
if __name__ == "__main__":
unittest.main()

View File

@ -788,6 +788,34 @@ def apply_section_update(camera_config, section: str, update: dict) -> Optional[
)
camera_config.objects = new_objects
elif section == "detect":
# apply detect first so frame_shape reflects the new resolution
# before we rebuild mask-dependent runtime configs below
merged = deep_merge(current.model_dump(), update, override=True)
camera_config.detect = current.__class__.model_validate(merged)
new_frame_shape = camera_config.frame_shape
# rebuild motion's rasterized_mask at the new frame_shape
if camera_config.motion is not None:
camera_config.motion = RuntimeMotionConfig(
frame_shape=new_frame_shape,
**camera_config.motion.model_dump(exclude_unset=True),
)
# rebuild per-object filter masks at the new frame_shape
for obj_name, filt in camera_config.objects.filters.items():
merged_mask = dict(filt.mask)
if camera_config.objects.mask:
for gid, gmask in camera_config.objects.mask.items():
merged_mask[f"global_{gid}"] = gmask
camera_config.objects.filters[obj_name] = RuntimeFilterConfig(
frame_shape=new_frame_shape,
mask=merged_mask,
**filt.model_dump(exclude_unset=True, exclude={"mask", "raw_mask"}),
)
else:
merged = deep_merge(current.model_dump(), update, override=True)
setattr(camera_config, section, current.__class__.model_validate(merged))

View File

@ -1089,10 +1089,25 @@ class SharedMemoryFrameManager(FrameManager):
def get(self, name: str, shape) -> Optional[np.ndarray]:
try:
if name in self.shm_store:
shm = self.shm_store[name]
else:
required = int(np.prod(shape))
shm = self.shm_store.get(name)
if shm is not None and shm.size != required:
# stale cached ref from a same-name recreate — drop and reopen
try:
shm.close()
except Exception:
pass
self.shm_store.pop(name, None)
shm = None
if shm is None:
shm = UntrackedSharedMemory(name=name)
if shm.size != required:
# mid-recreate: OS segment doesn't match shape yet; skip
try:
shm.close()
except Exception:
pass
return None
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
except FileNotFoundError:

View File

@ -72,6 +72,25 @@ const detect: SectionConfigOverrides = {
"max_disappeared",
],
},
replay: {
restartRequired: [],
fieldOrder: ["width", "height", "fps"],
fieldGroups: {
resolution: ["width", "height", "fps"],
},
hiddenFields: [
"enabled",
"enabled_in_config",
"min_initialized",
"max_disappeared",
"annotation_offset",
"stationary",
"interval",
"threshold",
"max_frames",
],
advancedFields: [],
},
};
export default detect;

View File

@ -1253,7 +1253,12 @@ export function ConfigSection({
<CollapsibleTrigger asChild>
<div className="flex cursor-pointer items-center justify-between">
<div className="flex items-center gap-3">
<Heading as="h4">{title}</Heading>
<Heading
as="h4"
className={level === "replay" ? "text-base" : undefined}
>
{title}
</Heading>
{showOverrideIndicator &&
effectiveLevel === "camera" &&
(profileOverridesSection || isOverridden) &&

View File

@ -354,6 +354,18 @@ export default function Replay() {
</div>
) : (
<div className="space-y-6">
<ConfigSectionTemplate
sectionKey="detect"
level="replay"
cameraName={status.replay_camera ?? undefined}
skipSave
noStickyButtons
requiresRestart={false}
collapsible
defaultCollapsed={false}
showTitle
showOverrideIndicator={false}
/>
<ConfigSectionTemplate
sectionKey="motion"
level="replay"