Compare commits

...

13 Commits

Author SHA1 Message Date
Josh Hawkins
3bfebc1c07 fix autotracking calibration to support new config updater function 2025-06-11 12:10:36 -05:00
Josh Hawkins
de310f0484 Add ability to update config via json body to config/set endpoint
Additionally, update the config in a single rather than multiple calls for each updated key
2025-06-11 12:10:01 -05:00
Nicolas Mowen
d6dda7a3df Use exact string so similar camera names don't interfere 2025-06-11 11:07:58 -06:00
Nicolas Mowen
ebd79f123f Don't enable audio if no cameras have audio transcription 2025-06-11 10:26:00 -06:00
Nicolas Mowen
e3e1728f91 Cleanup 2025-06-11 09:08:04 -06:00
Nicolas Mowen
86b5e0f9ae Cleanup for updating the cameras config 2025-06-11 09:06:42 -06:00
Nicolas Mowen
1d0e9829f6 Get camera correctly created 2025-06-11 08:37:49 -06:00
Nicolas Mowen
301c01dbf2 Use ZMQ for signaling object detectoin is completed 2025-06-11 08:31:01 -06:00
Nicolas Mowen
225010c570 Create out SHM 2025-06-11 08:10:34 -06:00
Nicolas Mowen
02144402e5 Cleanup 2025-06-11 07:42:37 -06:00
Nicolas Mowen
e871c5178a Correctly handle indexed entries 2025-06-11 07:35:27 -06:00
Nicolas Mowen
67cd746135 Improve typing 2025-06-11 07:18:22 -06:00
Nicolas Mowen
785e655ff5 Simplify config updates 2025-06-11 06:25:14 -06:00
19 changed files with 292 additions and 202 deletions

View File

@ -6,6 +6,7 @@ import json
import logging
import os
import traceback
import urllib
from datetime import datetime, timedelta
from functools import reduce
from io import StringIO
@ -36,8 +37,10 @@ from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.util.builtin import (
clean_camera_user_pass,
flatten_config_data,
get_tz_modifiers,
update_yaml_from_url,
process_config_query_string,
update_yaml_file_bulk,
)
from frigate.util.config import find_config_file
from frigate.util.services import (
@ -358,14 +361,37 @@ def config_set(request: Request, body: AppConfigSetBody):
with open(config_file, "r") as f:
old_raw_config = f.read()
f.close()
try:
update_yaml_from_url(config_file, str(request.url))
updates = {}
# process query string parameters (takes precedence over body.config_data)
parsed_url = urllib.parse.urlparse(str(request.url))
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
if query_string:
updates = process_config_query_string(query_string)
elif body.config_data:
updates = flatten_config_data(body.config_data)
if not updates:
return JSONResponse(
content=(
{"success": False, "message": "No configuration data provided"}
),
status_code=400,
)
# apply all updates in a single operation
update_yaml_file_bulk(config_file, updates)
# validate the updated config
with open(config_file, "r") as f:
new_raw_config = f.read()
f.close()
# Validate the config schema
try:
config = FrigateConfig.parse(new_raw_config)
except Exception:
@ -390,17 +416,23 @@ def config_set(request: Request, body: AppConfigSetBody):
)
if body.requires_restart == 0 or body.update_topic:
old_config: FrigateConfig = request.app.frigate_config
request.app.frigate_config = config
if body.update_topic:
if body.update_topic.startswith("config/cameras/"):
_, _, camera, field = body.update_topic.split("/")
if body.update_topic and body.update_topic.startswith("config/cameras/"):
_, _, camera, field = body.update_topic.split("/")
if field == "add":
settings = config.cameras[camera]
elif field == "remove":
settings = old_config.cameras[camera]
else:
settings = config.get_nested_object(body.update_topic)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera),
settings,
)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera),
settings,
)
return JSONResponse(
content=(

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel
@ -6,6 +6,7 @@ from pydantic import BaseModel
class AppConfigSetBody(BaseModel):
requires_restart: int = 1
update_topic: str | None = None
config_data: Optional[Dict[str, Any]] = None
class AppPutPasswordBody(BaseModel):

View File

@ -82,7 +82,6 @@ class FrigateApp:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {}
@ -363,8 +362,6 @@ class FrigateApp:
def start_detectors(self) -> None:
for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try:
largest_frame = max(
[
@ -396,7 +393,7 @@ class FrigateApp:
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,
list(self.config.cameras.keys()),
detector_config,
)
@ -435,7 +432,6 @@ class FrigateApp:
self.camera_maintainer = CameraMaintainer(
self.config,
self.detection_queue,
self.detection_out_events,
self.detected_frames_queue,
self.camera_metrics,
self.ptz_metrics,

View File

@ -3,7 +3,7 @@
from collections import Counter
from typing import Any, Callable
from frigate.config.config import FrigateConfig
from frigate.config import CameraConfig, FrigateConfig
class CameraActivityManager:
@ -23,26 +23,33 @@ class CameraActivityManager:
if not camera_config.enabled_in_config:
continue
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
self.__init_camera(camera_config)
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
def __init_camera(self, camera_config: CameraConfig) -> None:
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)

View File

@ -1,7 +1,6 @@
"""Create and maintain camera processes / management."""
import logging
import multiprocessing as mp
import os
import shutil
import threading
@ -11,12 +10,15 @@ from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig
from frigate.config.camera import CameraConfig
from frigate.config.updater import GlobalConfigUpdateEnum, GlobalConfigUpdateSubscriber
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
from frigate.video import capture_camera, track_camera
@ -28,7 +30,6 @@ class CameraMaintainer(threading.Thread):
self,
config: FrigateConfig,
detection_queue: Queue,
detection_out_events: dict[str, MpEvent],
detected_frames_queue: Queue,
camera_metrics: dict[str, CameraMetrics],
ptz_metrics: dict[str, PTZMetrics],
@ -37,19 +38,19 @@ class CameraMaintainer(threading.Thread):
super().__init__(name="camera_processor")
self.config = config
self.detection_queue = detection_queue
self.detection_out_events = detection_out_events
self.detected_frames_queue = detected_frames_queue
self.stop_event = stop_event
self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics
self.frame_manager = SharedMemoryFrameManager()
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.update_subscriber = GlobalConfigUpdateSubscriber(
self.update_subscriber = CameraConfigUpdateSubscriber(
self.config,
{},
[
GlobalConfigUpdateEnum.add_camera,
GlobalConfigUpdateEnum.debug_camera,
GlobalConfigUpdateEnum.remove_camera,
]
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
],
)
self.shm_count = self.__calculate_shm_frame_count()
@ -124,8 +125,6 @@ class CameraMaintainer(threading.Thread):
return
if runtime:
# TODO we have to send a ZMQ message to the object detector with the same out event
self.detection_out_events[name] = mp.Event()
self.camera_metrics[name] = CameraMetrics()
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
self.region_grids[name] = get_camera_regions_grid(
@ -134,6 +133,24 @@ class CameraMaintainer(threading.Thread):
max(self.config.model.width, self.config.model.height),
)
try:
largest_frame = max(
[
det.model.height * det.model.width * 3
if det.model is not None
else 320
for det in self.config.detectors.values()
]
)
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
UntrackedSharedMemory(
name=name,
create=True,
size=largest_frame,
)
except FileExistsError:
pass
camera_process = FrigateProcess(
target=track_camera,
name=f"camera_processor:{name}",
@ -143,7 +160,6 @@ class CameraMaintainer(threading.Thread):
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
@ -155,13 +171,15 @@ class CameraMaintainer(threading.Thread):
camera_process.start()
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(self, name: str, config: CameraConfig) -> None:
if not self.config.cameras[name].enabled_in_config:
def __start_camera_capture(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
return
# pre-create shms
for i in range(self.shm_count):
for i in range(10 if runtime else self.shm_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
@ -203,20 +221,20 @@ class CameraMaintainer(threading.Thread):
while not self.stop_event.wait(1):
updates = self.update_subscriber.check_for_updates()
for update_type, update_config in updates:
if update_type == GlobalConfigUpdateEnum.add_camera:
self.__start_camera_processor(
update_config.name, update_config, runtime=True
)
self.__start_camera_capture(update_config.name, update_config)
elif update_type == GlobalConfigUpdateEnum.debug_camera:
pass
elif update_type == GlobalConfigUpdateEnum.remove_camera:
camera = update_config.name
if camera:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
for update_type, updated_cameras in updates.items():
if update_type == CameraConfigUpdateEnum.add.name:
for camera in updated_cameras:
self.__start_camera_processor(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
self.__start_camera_capture(
camera, self.update_subscriber.camera_configs[camera]
)
elif update_type == CameraConfigUpdateEnum.remove.name:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
# ensure the capture processes are done
for camera in self.camera_metrics.keys():
@ -226,4 +244,5 @@ class CameraMaintainer(threading.Thread):
for camera in self.camera_metrics.keys():
self.__stop_camera_process(camera)
self.update_subscriber.stop()
self.frame_manager.cleanup()

View File

@ -0,0 +1,21 @@
"""Facilitates communication between processes for object detection signals."""
from .zmq_proxy import Publisher, Subscriber
class ObjectDetectorPublisher(Publisher):
"""Publishes signal for object detection to different processes."""
topic_base = "object_detector/"
class ObjectDetectorSubscriber(Subscriber):
"""Simplifies receiving a signal for object detection."""
topic_base = "object_detector/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def check_for_update(self):
return super().check_for_update(timeout=5)

View File

@ -81,7 +81,7 @@ class WebPushClient(Communicator): # type: ignore[misc]
"config/notifications", exact=True
)
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.notifications]
self.config, self.config.cameras, [CameraConfigUpdateEnum.notifications]
)
def subscribe(self, receiver: Callable) -> None:
@ -170,7 +170,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
if updated_notification_config:
self.config.notifications = updated_notification_config
self.config_subscriber.check_for_updates()
updates = self.config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
self.suspended_cameras[camera] = 0
self.last_camera_notification_time[camera] = 0
if topic == "reviews":
decoded = json.loads(payload)

View File

@ -5,12 +5,13 @@ from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config import CameraConfig
from frigate.config import CameraConfig, FrigateConfig
class CameraConfigUpdateEnum(str, Enum):
"""Supported camera config update types."""
add = "add" # for adding a camera
audio = "audio"
audio_transcription = "audio_transcription"
birdseye = "birdseye"
@ -20,6 +21,7 @@ class CameraConfigUpdateEnum(str, Enum):
notifications = "notifications"
objects = "objects"
record = "record"
remove = "remove" # for removing a camera
review = "review"
snapshots = "snapshots"
zones = "zones"
@ -49,9 +51,11 @@ class CameraConfigUpdatePublisher:
class CameraConfigUpdateSubscriber:
def __init__(
self,
config: FrigateConfig | None,
camera_configs: dict[str, CameraConfig],
topics: list[CameraConfigUpdateEnum],
):
self.config = config
self.camera_configs = camera_configs
self.topics = topics
@ -68,14 +72,23 @@ class CameraConfigUpdateSubscriber:
def __update_config(
self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any
) -> None:
config = self.camera_configs[camera]
if update_type == CameraConfigUpdateEnum.add:
self.config.cameras[camera] = updated_config
self.camera_configs[camera] = updated_config
return
elif update_type == CameraConfigUpdateEnum.remove:
self.config.cameras.pop(camera)
self.camera_configs.pop(camera)
return
config = self.camera_configs.get(camera)
if not config:
return
if update_type == CameraConfigUpdateEnum.audio:
config.audio = updated_config
if update_type == CameraConfigUpdateEnum.audio_transcription:
elif update_type == CameraConfigUpdateEnum.audio_transcription:
config.audio_transcription = updated_config
elif update_type == CameraConfigUpdateEnum.birdseye:
config.birdseye = updated_config

View File

@ -1,69 +0,0 @@
"""Convenience classes for updating global configurations dynamically."""
from dataclasses import dataclass
from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config.camera import CameraConfig
class GlobalConfigUpdateEnum(str, Enum):
"""Supported global config update types."""
add_camera = "add_camera"
debug_camera = "debug_camera"
remove_camera = "remove_camera"
@dataclass
class GlobalConfigUpdateTopic:
update_type: GlobalConfigUpdateEnum
@property
def topic(self) -> str:
return f"config/{self.update_type.name}"
class GlobalConfigUpdatePublisher:
def __init__(self):
self.publisher = ConfigPublisher()
def publish_update(self, topic: GlobalConfigUpdateTopic, config: Any) -> None:
self.publisher.publish(topic.topic, config)
def stop(self) -> None:
self.publisher.stop()
class GlobalConfigUpdateSubscriber:
def __init__(
self,
topics: list[GlobalConfigUpdateEnum],
):
self.topics = topics
self.subscriber = ConfigSubscriber(
"config/",
exact=False,
)
def check_for_updates(self) -> list[tuple[GlobalConfigUpdateEnum, CameraConfig]]:
updated_topics: list[tuple[GlobalConfigUpdateEnum, CameraConfig]] = []
# get all updates available
while True:
update_topic, update_config = self.subscriber.check_for_update()
if update_topic is None or update_config is None:
break
_, raw_type = update_topic.split("/")
update_type = GlobalConfigUpdateEnum[raw_type]
if update_type in self.topics:
updated_topics.append((update_type, update_config))
return updated_topics
def stop(self) -> None:
self.subscriber.stop()

View File

@ -29,6 +29,10 @@ from frigate.comms.recordings_updater import (
)
from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
@ -87,6 +91,11 @@ class EmbeddingMaintainer(threading.Thread):
self.config = config
self.metrics = metrics
self.embeddings = None
self.config_updater = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)
@ -198,6 +207,7 @@ class EmbeddingMaintainer(threading.Thread):
def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._process_requests()
self._process_updates()
self._process_recordings_updates()
@ -206,6 +216,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_finalized()
self._process_event_metadata()
self.config_updater.stop()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()

View File

@ -90,10 +90,19 @@ class AudioProcessor(util.Process):
self.camera_metrics = camera_metrics
self.cameras = cameras
self.config = config
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
)
if any(
[
conf.audio_transcription.enabled_in_config
for conf in config.cameras.values()
]
):
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
)
else:
self.transcription_model_runner = None
def run(self) -> None:
audio_threads: list[AudioEventMaintainer] = []
@ -138,7 +147,7 @@ class AudioEventMaintainer(threading.Thread):
camera: CameraConfig,
config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics],
audio_transcription_model_runner: AudioTranscriptionModelRunner,
audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event,
) -> None:
super().__init__(name=f"{camera.name}_audio_event_processor")
@ -162,6 +171,7 @@ class AudioEventMaintainer(threading.Thread):
# create communication for audio detections
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
None,
{self.camera_config.name: self.camera_config},
[
CameraConfigUpdateEnum.audio,

View File

@ -13,6 +13,10 @@ import numpy as np
from setproctitle import setproctitle
import frigate.util as util
from frigate.comms.object_detector_signaler import (
ObjectDetectorPublisher,
ObjectDetectorSubscriber,
)
from frigate.detectors import create_detector
from frigate.detectors.detector_config import (
BaseDetectorConfig,
@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
@ -108,15 +112,19 @@ def run_detector(
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
outputs = {}
for name in out_events.keys():
def create_output_shm(name: str):
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
detector_publisher = ObjectDetectorPublisher()
outputs = {}
for name in cameras:
create_output_shm(name)
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
@ -136,12 +144,18 @@ def run_detector(
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
if connection_id not in outputs:
create_output_shm(connection_id)
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
signal_id = f"{connection_id}/update"
detector_publisher.publish(signal_id, signal_id)
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10
detector_publisher.stop()
logger.info("Exited detection process...")
@ -150,11 +164,11 @@ class ObjectDetectProcess:
self,
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
detector_config: BaseDetectorConfig,
):
self.name = name
self.out_events = out_events
self.cameras = cameras
self.detection_queue = detection_queue
self.avg_inference_speed = Value("d", 0.01)
self.detection_start = Value("d", 0.0)
@ -176,7 +190,6 @@ class ObjectDetectProcess:
logging.info("Detection process has exited...")
def start_or_restart(self):
# TODO have to create a separate ZMQ listener for the MP.Events to be sent here
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
@ -186,7 +199,7 @@ class ObjectDetectProcess:
args=(
self.name,
self.detection_queue,
self.out_events,
self.cameras,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
@ -202,7 +215,6 @@ class RemoteObjectDetector:
name: str,
labels: dict[int, str],
detection_queue: Queue,
event: MpEvent,
model_config: ModelConfig,
stop_event: MpEvent,
):
@ -210,7 +222,6 @@ class RemoteObjectDetector:
self.name = name
self.fps = EventsPerSecond()
self.detection_queue = detection_queue
self.event = event
self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray(
@ -220,6 +231,7 @@ class RemoteObjectDetector:
)
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update")
def detect(self, tensor_input, threshold=0.4):
detections = []
@ -229,9 +241,8 @@ class RemoteObjectDetector:
# copy input to shared memory
self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name)
result = self.event.wait(timeout=5.0)
result = self.detector_subscriber.check_for_update()
# if it timed out
if result is None:
@ -247,5 +258,6 @@ class RemoteObjectDetector:
return detections
def cleanup(self):
self.detector_subscriber.stop()
self.shm.unlink()
self.out_shm.unlink()

View File

@ -103,8 +103,10 @@ def output_frames(
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
@ -135,7 +137,15 @@ def output_frames(
while not stop_event.is_set():
# check if there is an updated config
config_subscriber.check_for_updates()
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()

View File

@ -31,7 +31,7 @@ from frigate.const import (
)
from frigate.ptz.onvif import OnvifController
from frigate.track.tracked_object import TrackedObject
from frigate.util.builtin import update_yaml_file
from frigate.util.builtin import update_yaml_file_bulk
from frigate.util.config import find_config_file
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
@ -348,10 +348,13 @@ class PtzAutoTracker:
f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
)
update_yaml_file(
update_yaml_file_bulk(
config_file,
["cameras", camera, "onvif", "autotracking", "movement_weights"],
self.config.cameras[camera].onvif.autotracking.movement_weights,
{
f"cameras.{camera}.onvif.autotracking.movement_weights": self.config.cameras[
camera
].onvif.autotracking.movement_weights
},
)
async def _calibrate_camera(self, camera):

View File

@ -75,7 +75,9 @@ class RecordingMaintainer(threading.Thread):
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.record]
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
)
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.recordings_publisher = RecordingsDataPublisher(

View File

@ -154,10 +154,13 @@ class ReviewSegmentMaintainer(threading.Thread):
# create communication for review segments
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.review,
],
)

View File

@ -32,7 +32,6 @@ from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.config.updater import GlobalConfigUpdateEnum, GlobalConfigUpdateSubscriber
from frigate.const import FAST_QUEUE_TIMEOUT, UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, Timeline
@ -67,12 +66,15 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread
self.global_config_subscriber = GlobalConfigUpdateSubscriber(
[GlobalConfigUpdateEnum.add_camera, GlobalConfigUpdateEnum.remove_camera]
)
self.camera_config_subscriber = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.zones],
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.zones,
],
)
self.requestor = InterProcessRequestor()
@ -590,16 +592,6 @@ class TrackedObjectProcessor(threading.Thread):
def run(self):
while not self.stop_event.is_set():
# check for global config updates
for topic, payload in self.global_config_subscriber.check_for_updates():
if topic == GlobalConfigUpdateEnum.add_camera:
self.create_camera_state(payload["camera"])
elif topic == GlobalConfigUpdateEnum.remove_camera:
camera = payload["camera"]
camera_state = self.camera_states[camera]
camera_state.shutdown()
del self.camera_states[camera]
# check for config updates
updated_topics = self.camera_config_subscriber.check_for_updates()
@ -609,6 +601,17 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_states[camera].prev_enabled = self.config.cameras[
camera
].enabled
elif "add" in updated_topics:
for camera in updated_topics["add"]:
self.config.cameras[camera] = (
self.camera_config_subscriber.camera_configs[camera]
)
self.create_camera_state(camera)
elif "remove" in updated_topics:
for camera in updated_topics["remove"]:
camera_state = self.camera_states[camera]
camera_state.shutdown()
self.camera_states.pop(camera)
# manage camera disabled state
for camera, config in self.config.cameras.items():

View File

@ -14,7 +14,7 @@ import urllib.parse
from collections.abc import Mapping
from multiprocessing.sharedctypes import Synchronized
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union
from zoneinfo import ZoneInfoNotFoundError
import numpy as np
@ -184,25 +184,12 @@ def create_mask(frame_shape, mask):
mask_img[:] = 255
def update_yaml_from_url(file_path, url):
parsed_url = urllib.parse.urlparse(url)
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
def process_config_query_string(query_string: Dict[str, list]) -> Dict[str, Any]:
updates = {}
for key_path_str, new_value_list in query_string.items():
key_path = key_path_str.split(".")
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
# use the string key as-is for updates dictionary
if len(new_value_list) > 1:
update_yaml_file(file_path, key_path, new_value_list)
updates[key_path_str] = new_value_list
else:
value = new_value_list[0]
try:
@ -210,10 +197,24 @@ def update_yaml_from_url(file_path, url):
value = ast.literal_eval(value) if "," not in value else value
except (ValueError, SyntaxError):
pass
update_yaml_file(file_path, key_path, value)
updates[key_path_str] = value
return updates
def update_yaml_file(file_path, key_path, new_value):
def flatten_config_data(
config_data: Dict[str, Any], parent_key: str = ""
) -> Dict[str, Any]:
items = []
for key, value in config_data.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
items.extend(flatten_config_data(value, new_key).items())
else:
items.append((new_key, value))
return dict(items)
def update_yaml_file_bulk(file_path: str, updates: Dict[str, Any]):
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
@ -226,7 +227,17 @@ def update_yaml_file(file_path, key_path, new_value):
)
return
data = update_yaml(data, key_path, new_value)
# Apply all updates
for key_path_str, new_value in updates.items():
key_path = key_path_str.split(".")
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
data = update_yaml(data, key_path, new_value)
try:
with open(file_path, "w") as f:

View File

@ -116,7 +116,7 @@ def capture_frames(
skipped_eps = EventsPerSecond()
skipped_eps.start()
config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled]
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
)
def get_enabled_state():
@ -196,7 +196,7 @@ class CameraWatchdog(threading.Thread):
self.sleeptime = self.config.ffmpeg.retry_interval
self.config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled]
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
)
self.was_enabled = self.config.enabled
@ -473,7 +473,6 @@ def track_camera(
model_config: ModelConfig,
labelmap: dict[int, str],
detection_queue: Queue,
result_connection: MpEvent,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
@ -503,7 +502,7 @@ def track_camera(
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event
name, labelmap, detection_queue, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)
@ -597,6 +596,7 @@ def process_frames(
):
next_region_update = get_tomorrow_at_time(2)
config_subscriber = CameraConfigUpdateSubscriber(
None,
{camera_name: camera_config},
[
CameraConfigUpdateEnum.detect,