Compare commits

...

13 Commits

Author SHA1 Message Date
Josh Hawkins
3bfebc1c07 fix autotracking calibration to support new config updater function 2025-06-11 12:10:36 -05:00
Josh Hawkins
de310f0484 Add ability to update config via json body to config/set endpoint
Additionally, update the config in a single rather than multiple calls for each updated key
2025-06-11 12:10:01 -05:00
Nicolas Mowen
d6dda7a3df Use exact string so similar camera names don't interfere 2025-06-11 11:07:58 -06:00
Nicolas Mowen
ebd79f123f Don't enable audio if no cameras have audio transcription 2025-06-11 10:26:00 -06:00
Nicolas Mowen
e3e1728f91 Cleanup 2025-06-11 09:08:04 -06:00
Nicolas Mowen
86b5e0f9ae Cleanup for updating the cameras config 2025-06-11 09:06:42 -06:00
Nicolas Mowen
1d0e9829f6 Get camera correctly created 2025-06-11 08:37:49 -06:00
Nicolas Mowen
301c01dbf2 Use ZMQ for signaling object detectoin is completed 2025-06-11 08:31:01 -06:00
Nicolas Mowen
225010c570 Create out SHM 2025-06-11 08:10:34 -06:00
Nicolas Mowen
02144402e5 Cleanup 2025-06-11 07:42:37 -06:00
Nicolas Mowen
e871c5178a Correctly handle indexed entries 2025-06-11 07:35:27 -06:00
Nicolas Mowen
67cd746135 Improve typing 2025-06-11 07:18:22 -06:00
Nicolas Mowen
785e655ff5 Simplify config updates 2025-06-11 06:25:14 -06:00
19 changed files with 292 additions and 202 deletions

View File

@ -6,6 +6,7 @@ import json
import logging import logging
import os import os
import traceback import traceback
import urllib
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import reduce from functools import reduce
from io import StringIO from io import StringIO
@ -36,8 +37,10 @@ from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.util.builtin import ( from frigate.util.builtin import (
clean_camera_user_pass, clean_camera_user_pass,
flatten_config_data,
get_tz_modifiers, get_tz_modifiers,
update_yaml_from_url, process_config_query_string,
update_yaml_file_bulk,
) )
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.services import ( from frigate.util.services import (
@ -358,14 +361,37 @@ def config_set(request: Request, body: AppConfigSetBody):
with open(config_file, "r") as f: with open(config_file, "r") as f:
old_raw_config = f.read() old_raw_config = f.read()
f.close()
try: try:
update_yaml_from_url(config_file, str(request.url)) updates = {}
# process query string parameters (takes precedence over body.config_data)
parsed_url = urllib.parse.urlparse(str(request.url))
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
if query_string:
updates = process_config_query_string(query_string)
elif body.config_data:
updates = flatten_config_data(body.config_data)
if not updates:
return JSONResponse(
content=(
{"success": False, "message": "No configuration data provided"}
),
status_code=400,
)
# apply all updates in a single operation
update_yaml_file_bulk(config_file, updates)
# validate the updated config
with open(config_file, "r") as f: with open(config_file, "r") as f:
new_raw_config = f.read() new_raw_config = f.read()
f.close()
# Validate the config schema
try: try:
config = FrigateConfig.parse(new_raw_config) config = FrigateConfig.parse(new_raw_config)
except Exception: except Exception:
@ -390,17 +416,23 @@ def config_set(request: Request, body: AppConfigSetBody):
) )
if body.requires_restart == 0 or body.update_topic: if body.requires_restart == 0 or body.update_topic:
old_config: FrigateConfig = request.app.frigate_config
request.app.frigate_config = config request.app.frigate_config = config
if body.update_topic: if body.update_topic and body.update_topic.startswith("config/cameras/"):
if body.update_topic.startswith("config/cameras/"): _, _, camera, field = body.update_topic.split("/")
_, _, camera, field = body.update_topic.split("/")
if field == "add":
settings = config.cameras[camera]
elif field == "remove":
settings = old_config.cameras[camera]
else:
settings = config.get_nested_object(body.update_topic) settings = config.get_nested_object(body.update_topic)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera), request.app.config_publisher.publish_update(
settings, CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera),
) settings,
)
return JSONResponse( return JSONResponse(
content=( content=(

View File

@ -1,4 +1,4 @@
from typing import Optional from typing import Any, Dict, Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -6,6 +6,7 @@ from pydantic import BaseModel
class AppConfigSetBody(BaseModel): class AppConfigSetBody(BaseModel):
requires_restart: int = 1 requires_restart: int = 1
update_topic: str | None = None update_topic: str | None = None
config_data: Optional[Dict[str, Any]] = None
class AppPutPasswordBody(BaseModel): class AppPutPasswordBody(BaseModel):

View File

@ -82,7 +82,6 @@ class FrigateApp:
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {} self.camera_metrics: dict[str, CameraMetrics] = {}
@ -363,8 +362,6 @@ class FrigateApp:
def start_detectors(self) -> None: def start_detectors(self) -> None:
for name in self.config.cameras.keys(): for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try: try:
largest_frame = max( largest_frame = max(
[ [
@ -396,7 +393,7 @@ class FrigateApp:
self.detectors[name] = ObjectDetectProcess( self.detectors[name] = ObjectDetectProcess(
name, name,
self.detection_queue, self.detection_queue,
self.detection_out_events, list(self.config.cameras.keys()),
detector_config, detector_config,
) )
@ -435,7 +432,6 @@ class FrigateApp:
self.camera_maintainer = CameraMaintainer( self.camera_maintainer = CameraMaintainer(
self.config, self.config,
self.detection_queue, self.detection_queue,
self.detection_out_events,
self.detected_frames_queue, self.detected_frames_queue,
self.camera_metrics, self.camera_metrics,
self.ptz_metrics, self.ptz_metrics,

View File

@ -3,7 +3,7 @@
from collections import Counter from collections import Counter
from typing import Any, Callable from typing import Any, Callable
from frigate.config.config import FrigateConfig from frigate.config import CameraConfig, FrigateConfig
class CameraActivityManager: class CameraActivityManager:
@ -23,26 +23,33 @@ class CameraActivityManager:
if not camera_config.enabled_in_config: if not camera_config.enabled_in_config:
continue continue
self.last_camera_activity[camera_config.name] = {} self.__init_camera(camera_config)
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
for zone, zone_config in camera_config.zones.items(): def __init_camera(self, camera_config: CameraConfig) -> None:
if zone not in self.all_zone_labels: self.last_camera_activity[camera_config.name] = {}
self.zone_all_object_counts[zone] = Counter() self.camera_all_object_counts[camera_config.name] = Counter()
self.zone_active_object_counts[zone] = Counter() self.camera_active_object_counts[camera_config.name] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update( for zone, zone_config in camera_config.zones.items():
zone_config.objects if zone not in self.all_zone_labels:
if zone_config.objects self.zone_all_object_counts[zone] = Counter()
else camera_config.objects.track self.zone_active_object_counts[zone] = Counter()
) self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None: def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = [] all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys(): for camera in new_activity.keys():
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", []) new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects) all_objects.extend(new_objects)

View File

@ -1,7 +1,6 @@
"""Create and maintain camera processes / management.""" """Create and maintain camera processes / management."""
import logging import logging
import multiprocessing as mp
import os import os
import shutil import shutil
import threading import threading
@ -11,12 +10,15 @@ from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.config.camera import CameraConfig from frigate.config.camera import CameraConfig
from frigate.config.updater import GlobalConfigUpdateEnum, GlobalConfigUpdateSubscriber from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import SHM_FRAMES_VAR from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions from frigate.models import Regions
from frigate.util import Process as FrigateProcess from frigate.util import Process as FrigateProcess
from frigate.util.builtin import empty_and_close_queue from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid from frigate.util.object import get_camera_regions_grid
from frigate.video import capture_camera, track_camera from frigate.video import capture_camera, track_camera
@ -28,7 +30,6 @@ class CameraMaintainer(threading.Thread):
self, self,
config: FrigateConfig, config: FrigateConfig,
detection_queue: Queue, detection_queue: Queue,
detection_out_events: dict[str, MpEvent],
detected_frames_queue: Queue, detected_frames_queue: Queue,
camera_metrics: dict[str, CameraMetrics], camera_metrics: dict[str, CameraMetrics],
ptz_metrics: dict[str, PTZMetrics], ptz_metrics: dict[str, PTZMetrics],
@ -37,19 +38,19 @@ class CameraMaintainer(threading.Thread):
super().__init__(name="camera_processor") super().__init__(name="camera_processor")
self.config = config self.config = config
self.detection_queue = detection_queue self.detection_queue = detection_queue
self.detection_out_events = detection_out_events
self.detected_frames_queue = detected_frames_queue self.detected_frames_queue = detected_frames_queue
self.stop_event = stop_event self.stop_event = stop_event
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
self.region_grids: dict[str, list[list[dict[str, int]]]] = {} self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.update_subscriber = GlobalConfigUpdateSubscriber( self.update_subscriber = CameraConfigUpdateSubscriber(
self.config,
{},
[ [
GlobalConfigUpdateEnum.add_camera, CameraConfigUpdateEnum.add,
GlobalConfigUpdateEnum.debug_camera, CameraConfigUpdateEnum.remove,
GlobalConfigUpdateEnum.remove_camera, ],
]
) )
self.shm_count = self.__calculate_shm_frame_count() self.shm_count = self.__calculate_shm_frame_count()
@ -124,8 +125,6 @@ class CameraMaintainer(threading.Thread):
return return
if runtime: if runtime:
# TODO we have to send a ZMQ message to the object detector with the same out event
self.detection_out_events[name] = mp.Event()
self.camera_metrics[name] = CameraMetrics() self.camera_metrics[name] = CameraMetrics()
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False) self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
self.region_grids[name] = get_camera_regions_grid( self.region_grids[name] = get_camera_regions_grid(
@ -134,6 +133,24 @@ class CameraMaintainer(threading.Thread):
max(self.config.model.width, self.config.model.height), max(self.config.model.width, self.config.model.height),
) )
try:
largest_frame = max(
[
det.model.height * det.model.width * 3
if det.model is not None
else 320
for det in self.config.detectors.values()
]
)
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
UntrackedSharedMemory(
name=name,
create=True,
size=largest_frame,
)
except FileExistsError:
pass
camera_process = FrigateProcess( camera_process = FrigateProcess(
target=track_camera, target=track_camera,
name=f"camera_processor:{name}", name=f"camera_processor:{name}",
@ -143,7 +160,6 @@ class CameraMaintainer(threading.Thread):
self.config.model, self.config.model,
self.config.model.merged_labelmap, self.config.model.merged_labelmap,
self.detection_queue, self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue, self.detected_frames_queue,
self.camera_metrics[name], self.camera_metrics[name],
self.ptz_metrics[name], self.ptz_metrics[name],
@ -155,13 +171,15 @@ class CameraMaintainer(threading.Thread):
camera_process.start() camera_process.start()
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(self, name: str, config: CameraConfig) -> None: def __start_camera_capture(
if not self.config.cameras[name].enabled_in_config: self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}") logger.info(f"Capture process not started for disabled camera {name}")
return return
# pre-create shms # pre-create shms
for i in range(self.shm_count): for i in range(10 if runtime else self.shm_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size) self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
@ -203,20 +221,20 @@ class CameraMaintainer(threading.Thread):
while not self.stop_event.wait(1): while not self.stop_event.wait(1):
updates = self.update_subscriber.check_for_updates() updates = self.update_subscriber.check_for_updates()
for update_type, update_config in updates: for update_type, updated_cameras in updates.items():
if update_type == GlobalConfigUpdateEnum.add_camera: if update_type == CameraConfigUpdateEnum.add.name:
self.__start_camera_processor( for camera in updated_cameras:
update_config.name, update_config, runtime=True self.__start_camera_processor(
) camera,
self.__start_camera_capture(update_config.name, update_config) self.update_subscriber.camera_configs[camera],
elif update_type == GlobalConfigUpdateEnum.debug_camera: runtime=True,
pass )
elif update_type == GlobalConfigUpdateEnum.remove_camera: self.__start_camera_capture(
camera = update_config.name camera, self.update_subscriber.camera_configs[camera]
)
if camera: elif update_type == CameraConfigUpdateEnum.remove.name:
self.__stop_camera_capture_process(camera) self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera) self.__stop_camera_process(camera)
# ensure the capture processes are done # ensure the capture processes are done
for camera in self.camera_metrics.keys(): for camera in self.camera_metrics.keys():
@ -226,4 +244,5 @@ class CameraMaintainer(threading.Thread):
for camera in self.camera_metrics.keys(): for camera in self.camera_metrics.keys():
self.__stop_camera_process(camera) self.__stop_camera_process(camera)
self.update_subscriber.stop()
self.frame_manager.cleanup() self.frame_manager.cleanup()

View File

@ -0,0 +1,21 @@
"""Facilitates communication between processes for object detection signals."""
from .zmq_proxy import Publisher, Subscriber
class ObjectDetectorPublisher(Publisher):
"""Publishes signal for object detection to different processes."""
topic_base = "object_detector/"
class ObjectDetectorSubscriber(Subscriber):
"""Simplifies receiving a signal for object detection."""
topic_base = "object_detector/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def check_for_update(self):
return super().check_for_update(timeout=5)

View File

@ -81,7 +81,7 @@ class WebPushClient(Communicator): # type: ignore[misc]
"config/notifications", exact=True "config/notifications", exact=True
) )
self.config_subscriber = CameraConfigUpdateSubscriber( self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.notifications] self.config, self.config.cameras, [CameraConfigUpdateEnum.notifications]
) )
def subscribe(self, receiver: Callable) -> None: def subscribe(self, receiver: Callable) -> None:
@ -170,7 +170,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
if updated_notification_config: if updated_notification_config:
self.config.notifications = updated_notification_config self.config.notifications = updated_notification_config
self.config_subscriber.check_for_updates() updates = self.config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
self.suspended_cameras[camera] = 0
self.last_camera_notification_time[camera] = 0
if topic == "reviews": if topic == "reviews":
decoded = json.loads(payload) decoded = json.loads(payload)

View File

@ -5,12 +5,13 @@ from enum import Enum
from typing import Any from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config import CameraConfig from frigate.config import CameraConfig, FrigateConfig
class CameraConfigUpdateEnum(str, Enum): class CameraConfigUpdateEnum(str, Enum):
"""Supported camera config update types.""" """Supported camera config update types."""
add = "add" # for adding a camera
audio = "audio" audio = "audio"
audio_transcription = "audio_transcription" audio_transcription = "audio_transcription"
birdseye = "birdseye" birdseye = "birdseye"
@ -20,6 +21,7 @@ class CameraConfigUpdateEnum(str, Enum):
notifications = "notifications" notifications = "notifications"
objects = "objects" objects = "objects"
record = "record" record = "record"
remove = "remove" # for removing a camera
review = "review" review = "review"
snapshots = "snapshots" snapshots = "snapshots"
zones = "zones" zones = "zones"
@ -49,9 +51,11 @@ class CameraConfigUpdatePublisher:
class CameraConfigUpdateSubscriber: class CameraConfigUpdateSubscriber:
def __init__( def __init__(
self, self,
config: FrigateConfig | None,
camera_configs: dict[str, CameraConfig], camera_configs: dict[str, CameraConfig],
topics: list[CameraConfigUpdateEnum], topics: list[CameraConfigUpdateEnum],
): ):
self.config = config
self.camera_configs = camera_configs self.camera_configs = camera_configs
self.topics = topics self.topics = topics
@ -68,14 +72,23 @@ class CameraConfigUpdateSubscriber:
def __update_config( def __update_config(
self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any
) -> None: ) -> None:
config = self.camera_configs[camera] if update_type == CameraConfigUpdateEnum.add:
self.config.cameras[camera] = updated_config
self.camera_configs[camera] = updated_config
return
elif update_type == CameraConfigUpdateEnum.remove:
self.config.cameras.pop(camera)
self.camera_configs.pop(camera)
return
config = self.camera_configs.get(camera)
if not config: if not config:
return return
if update_type == CameraConfigUpdateEnum.audio: if update_type == CameraConfigUpdateEnum.audio:
config.audio = updated_config config.audio = updated_config
if update_type == CameraConfigUpdateEnum.audio_transcription: elif update_type == CameraConfigUpdateEnum.audio_transcription:
config.audio_transcription = updated_config config.audio_transcription = updated_config
elif update_type == CameraConfigUpdateEnum.birdseye: elif update_type == CameraConfigUpdateEnum.birdseye:
config.birdseye = updated_config config.birdseye = updated_config

View File

@ -1,69 +0,0 @@
"""Convenience classes for updating global configurations dynamically."""
from dataclasses import dataclass
from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config.camera import CameraConfig
class GlobalConfigUpdateEnum(str, Enum):
"""Supported global config update types."""
add_camera = "add_camera"
debug_camera = "debug_camera"
remove_camera = "remove_camera"
@dataclass
class GlobalConfigUpdateTopic:
update_type: GlobalConfigUpdateEnum
@property
def topic(self) -> str:
return f"config/{self.update_type.name}"
class GlobalConfigUpdatePublisher:
def __init__(self):
self.publisher = ConfigPublisher()
def publish_update(self, topic: GlobalConfigUpdateTopic, config: Any) -> None:
self.publisher.publish(topic.topic, config)
def stop(self) -> None:
self.publisher.stop()
class GlobalConfigUpdateSubscriber:
def __init__(
self,
topics: list[GlobalConfigUpdateEnum],
):
self.topics = topics
self.subscriber = ConfigSubscriber(
"config/",
exact=False,
)
def check_for_updates(self) -> list[tuple[GlobalConfigUpdateEnum, CameraConfig]]:
updated_topics: list[tuple[GlobalConfigUpdateEnum, CameraConfig]] = []
# get all updates available
while True:
update_topic, update_config = self.subscriber.check_for_update()
if update_topic is None or update_config is None:
break
_, raw_type = update_topic.split("/")
update_type = GlobalConfigUpdateEnum[raw_type]
if update_type in self.topics:
updated_topics.append((update_type, update_config))
return updated_topics
def stop(self) -> None:
self.subscriber.stop()

View File

@ -29,6 +29,10 @@ from frigate.comms.recordings_updater import (
) )
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import ( from frigate.const import (
CLIPS_DIR, CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION, UPDATE_EVENT_DESCRIPTION,
@ -87,6 +91,11 @@ class EmbeddingMaintainer(threading.Thread):
self.config = config self.config = config
self.metrics = metrics self.metrics = metrics
self.embeddings = None self.embeddings = None
self.config_updater = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
)
if config.semantic_search.enabled: if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics) self.embeddings = Embeddings(config, db, metrics)
@ -198,6 +207,7 @@ class EmbeddingMaintainer(threading.Thread):
def run(self) -> None: def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search.""" """Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._process_requests() self._process_requests()
self._process_updates() self._process_updates()
self._process_recordings_updates() self._process_recordings_updates()
@ -206,6 +216,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_finalized() self._process_finalized()
self._process_event_metadata() self._process_event_metadata()
self.config_updater.stop()
self.event_subscriber.stop() self.event_subscriber.stop()
self.event_end_subscriber.stop() self.event_end_subscriber.stop()
self.recordings_subscriber.stop() self.recordings_subscriber.stop()

View File

@ -90,10 +90,19 @@ class AudioProcessor(util.Process):
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.cameras = cameras self.cameras = cameras
self.config = config self.config = config
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device, if any(
self.config.audio_transcription.model_size, [
) conf.audio_transcription.enabled_in_config
for conf in config.cameras.values()
]
):
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
)
else:
self.transcription_model_runner = None
def run(self) -> None: def run(self) -> None:
audio_threads: list[AudioEventMaintainer] = [] audio_threads: list[AudioEventMaintainer] = []
@ -138,7 +147,7 @@ class AudioEventMaintainer(threading.Thread):
camera: CameraConfig, camera: CameraConfig,
config: FrigateConfig, config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics], camera_metrics: dict[str, CameraMetrics],
audio_transcription_model_runner: AudioTranscriptionModelRunner, audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event, stop_event: threading.Event,
) -> None: ) -> None:
super().__init__(name=f"{camera.name}_audio_event_processor") super().__init__(name=f"{camera.name}_audio_event_processor")
@ -162,6 +171,7 @@ class AudioEventMaintainer(threading.Thread):
# create communication for audio detections # create communication for audio detections
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber( self.config_subscriber = CameraConfigUpdateSubscriber(
None,
{self.camera_config.name: self.camera_config}, {self.camera_config.name: self.camera_config},
[ [
CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.audio,

View File

@ -13,6 +13,10 @@ import numpy as np
from setproctitle import setproctitle from setproctitle import setproctitle
import frigate.util as util import frigate.util as util
from frigate.comms.object_detector_signaler import (
ObjectDetectorPublisher,
ObjectDetectorSubscriber,
)
from frigate.detectors import create_detector from frigate.detectors import create_detector
from frigate.detectors.detector_config import ( from frigate.detectors.detector_config import (
BaseDetectorConfig, BaseDetectorConfig,
@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector( def run_detector(
name: str, name: str,
detection_queue: Queue, detection_queue: Queue,
out_events: dict[str, MpEvent], cameras: list[str],
avg_speed: Value, avg_speed: Value,
start: Value, start: Value,
detector_config: BaseDetectorConfig, detector_config: BaseDetectorConfig,
@ -108,15 +112,19 @@ def run_detector(
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal) signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager() def create_output_shm(name: str):
object_detector = LocalObjectDetector(detector_config=detector_config)
outputs = {}
for name in out_events.keys():
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np} outputs[name] = {"shm": out_shm, "np": out_np}
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
detector_publisher = ObjectDetectorPublisher()
outputs = {}
for name in cameras:
create_output_shm(name)
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
connection_id = detection_queue.get(timeout=1) connection_id = detection_queue.get(timeout=1)
@ -136,12 +144,18 @@ def run_detector(
detections = object_detector.detect_raw(input_frame) detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id) frame_manager.close(connection_id)
if connection_id not in outputs:
create_output_shm(connection_id)
outputs[connection_id]["np"][:] = detections[:] outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set() signal_id = f"{connection_id}/update"
detector_publisher.publish(signal_id, signal_id)
start.value = 0.0 start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10 avg_speed.value = (avg_speed.value * 9 + duration) / 10
detector_publisher.stop()
logger.info("Exited detection process...") logger.info("Exited detection process...")
@ -150,11 +164,11 @@ class ObjectDetectProcess:
self, self,
name: str, name: str,
detection_queue: Queue, detection_queue: Queue,
out_events: dict[str, MpEvent], cameras: list[str],
detector_config: BaseDetectorConfig, detector_config: BaseDetectorConfig,
): ):
self.name = name self.name = name
self.out_events = out_events self.cameras = cameras
self.detection_queue = detection_queue self.detection_queue = detection_queue
self.avg_inference_speed = Value("d", 0.01) self.avg_inference_speed = Value("d", 0.01)
self.detection_start = Value("d", 0.0) self.detection_start = Value("d", 0.0)
@ -176,7 +190,6 @@ class ObjectDetectProcess:
logging.info("Detection process has exited...") logging.info("Detection process has exited...")
def start_or_restart(self): def start_or_restart(self):
# TODO have to create a separate ZMQ listener for the MP.Events to be sent here
self.detection_start.value = 0.0 self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive(): if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop() self.stop()
@ -186,7 +199,7 @@ class ObjectDetectProcess:
args=( args=(
self.name, self.name,
self.detection_queue, self.detection_queue,
self.out_events, self.cameras,
self.avg_inference_speed, self.avg_inference_speed,
self.detection_start, self.detection_start,
self.detector_config, self.detector_config,
@ -202,7 +215,6 @@ class RemoteObjectDetector:
name: str, name: str,
labels: dict[int, str], labels: dict[int, str],
detection_queue: Queue, detection_queue: Queue,
event: MpEvent,
model_config: ModelConfig, model_config: ModelConfig,
stop_event: MpEvent, stop_event: MpEvent,
): ):
@ -210,7 +222,6 @@ class RemoteObjectDetector:
self.name = name self.name = name
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
self.detection_queue = detection_queue self.detection_queue = detection_queue
self.event = event
self.stop_event = stop_event self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False) self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray( self.np_shm = np.ndarray(
@ -220,6 +231,7 @@ class RemoteObjectDetector:
) )
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update")
def detect(self, tensor_input, threshold=0.4): def detect(self, tensor_input, threshold=0.4):
detections = [] detections = []
@ -229,9 +241,8 @@ class RemoteObjectDetector:
# copy input to shared memory # copy input to shared memory
self.np_shm[:] = tensor_input[:] self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name) self.detection_queue.put(self.name)
result = self.event.wait(timeout=5.0) result = self.detector_subscriber.check_for_update()
# if it timed out # if it timed out
if result is None: if result is None:
@ -247,5 +258,6 @@ class RemoteObjectDetector:
return detections return detections
def cleanup(self): def cleanup(self):
self.detector_subscriber.stop()
self.shm.unlink() self.shm.unlink()
self.out_shm.unlink() self.out_shm.unlink()

View File

@ -103,8 +103,10 @@ def output_frames(
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras, config.cameras,
[ [
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye, CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record, CameraConfigUpdateEnum.record,
@ -135,7 +137,15 @@ def output_frames(
while not stop_event.is_set(): while not stop_event.is_set():
# check if there is an updated config # check if there is an updated config
config_subscriber.check_for_updates() updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1) (topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()

View File

@ -31,7 +31,7 @@ from frigate.const import (
) )
from frigate.ptz.onvif import OnvifController from frigate.ptz.onvif import OnvifController
from frigate.track.tracked_object import TrackedObject from frigate.track.tracked_object import TrackedObject
from frigate.util.builtin import update_yaml_file from frigate.util.builtin import update_yaml_file_bulk
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
@ -348,10 +348,13 @@ class PtzAutoTracker:
f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}" f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
) )
update_yaml_file( update_yaml_file_bulk(
config_file, config_file,
["cameras", camera, "onvif", "autotracking", "movement_weights"], {
self.config.cameras[camera].onvif.autotracking.movement_weights, f"cameras.{camera}.onvif.autotracking.movement_weights": self.config.cameras[
camera
].onvif.autotracking.movement_weights
},
) )
async def _calibrate_camera(self, camera): async def _calibrate_camera(self, camera):

View File

@ -75,7 +75,9 @@ class RecordingMaintainer(threading.Thread):
# create communication for retained recordings # create communication for retained recordings
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber( self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.record] self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.recordings_publisher = RecordingsDataPublisher( self.recordings_publisher = RecordingsDataPublisher(

View File

@ -154,10 +154,13 @@ class ReviewSegmentMaintainer(threading.Thread):
# create communication for review segments # create communication for review segments
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber( self.config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras, config.cameras,
[ [
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record, CameraConfigUpdateEnum.record,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.review, CameraConfigUpdateEnum.review,
], ],
) )

View File

@ -32,7 +32,6 @@ from frigate.config.camera.updater import (
CameraConfigUpdateEnum, CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber, CameraConfigUpdateSubscriber,
) )
from frigate.config.updater import GlobalConfigUpdateEnum, GlobalConfigUpdateSubscriber
from frigate.const import FAST_QUEUE_TIMEOUT, UPDATE_CAMERA_ACTIVITY from frigate.const import FAST_QUEUE_TIMEOUT, UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, Timeline from frigate.models import Event, Timeline
@ -67,12 +66,15 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread self.ptz_autotracker_thread = ptz_autotracker_thread
self.global_config_subscriber = GlobalConfigUpdateSubscriber(
[GlobalConfigUpdateEnum.add_camera, GlobalConfigUpdateEnum.remove_camera]
)
self.camera_config_subscriber = CameraConfigUpdateSubscriber( self.camera_config_subscriber = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras, self.config.cameras,
[CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.zones], [
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.zones,
],
) )
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
@ -590,16 +592,6 @@ class TrackedObjectProcessor(threading.Thread):
def run(self): def run(self):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
# check for global config updates
for topic, payload in self.global_config_subscriber.check_for_updates():
if topic == GlobalConfigUpdateEnum.add_camera:
self.create_camera_state(payload["camera"])
elif topic == GlobalConfigUpdateEnum.remove_camera:
camera = payload["camera"]
camera_state = self.camera_states[camera]
camera_state.shutdown()
del self.camera_states[camera]
# check for config updates # check for config updates
updated_topics = self.camera_config_subscriber.check_for_updates() updated_topics = self.camera_config_subscriber.check_for_updates()
@ -609,6 +601,17 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_states[camera].prev_enabled = self.config.cameras[ self.camera_states[camera].prev_enabled = self.config.cameras[
camera camera
].enabled ].enabled
elif "add" in updated_topics:
for camera in updated_topics["add"]:
self.config.cameras[camera] = (
self.camera_config_subscriber.camera_configs[camera]
)
self.create_camera_state(camera)
elif "remove" in updated_topics:
for camera in updated_topics["remove"]:
camera_state = self.camera_states[camera]
camera_state.shutdown()
self.camera_states.pop(camera)
# manage camera disabled state # manage camera disabled state
for camera, config in self.config.cameras.items(): for camera, config in self.config.cameras.items():

View File

@ -14,7 +14,7 @@ import urllib.parse
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing.sharedctypes import Synchronized from multiprocessing.sharedctypes import Synchronized
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple, Union from typing import Any, Dict, Optional, Tuple, Union
from zoneinfo import ZoneInfoNotFoundError from zoneinfo import ZoneInfoNotFoundError
import numpy as np import numpy as np
@ -184,25 +184,12 @@ def create_mask(frame_shape, mask):
mask_img[:] = 255 mask_img[:] = 255
def update_yaml_from_url(file_path, url): def process_config_query_string(query_string: Dict[str, list]) -> Dict[str, Any]:
parsed_url = urllib.parse.urlparse(url) updates = {}
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
for key_path_str, new_value_list in query_string.items(): for key_path_str, new_value_list in query_string.items():
key_path = key_path_str.split(".") # use the string key as-is for updates dictionary
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
if len(new_value_list) > 1: if len(new_value_list) > 1:
update_yaml_file(file_path, key_path, new_value_list) updates[key_path_str] = new_value_list
else: else:
value = new_value_list[0] value = new_value_list[0]
try: try:
@ -210,10 +197,24 @@ def update_yaml_from_url(file_path, url):
value = ast.literal_eval(value) if "," not in value else value value = ast.literal_eval(value) if "," not in value else value
except (ValueError, SyntaxError): except (ValueError, SyntaxError):
pass pass
update_yaml_file(file_path, key_path, value) updates[key_path_str] = value
return updates
def update_yaml_file(file_path, key_path, new_value): def flatten_config_data(
config_data: Dict[str, Any], parent_key: str = ""
) -> Dict[str, Any]:
items = []
for key, value in config_data.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
items.extend(flatten_config_data(value, new_key).items())
else:
items.append((new_key, value))
return dict(items)
def update_yaml_file_bulk(file_path: str, updates: Dict[str, Any]):
yaml = YAML() yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2) yaml.indent(mapping=2, sequence=4, offset=2)
@ -226,7 +227,17 @@ def update_yaml_file(file_path, key_path, new_value):
) )
return return
data = update_yaml(data, key_path, new_value) # Apply all updates
for key_path_str, new_value in updates.items():
key_path = key_path_str.split(".")
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
data = update_yaml(data, key_path, new_value)
try: try:
with open(file_path, "w") as f: with open(file_path, "w") as f:

View File

@ -116,7 +116,7 @@ def capture_frames(
skipped_eps = EventsPerSecond() skipped_eps = EventsPerSecond()
skipped_eps.start() skipped_eps.start()
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled] None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
) )
def get_enabled_state(): def get_enabled_state():
@ -196,7 +196,7 @@ class CameraWatchdog(threading.Thread):
self.sleeptime = self.config.ffmpeg.retry_interval self.sleeptime = self.config.ffmpeg.retry_interval
self.config_subscriber = CameraConfigUpdateSubscriber( self.config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled] None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
) )
self.was_enabled = self.config.enabled self.was_enabled = self.config.enabled
@ -473,7 +473,6 @@ def track_camera(
model_config: ModelConfig, model_config: ModelConfig,
labelmap: dict[int, str], labelmap: dict[int, str],
detection_queue: Queue, detection_queue: Queue,
result_connection: MpEvent,
detected_objects_queue, detected_objects_queue,
camera_metrics: CameraMetrics, camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics, ptz_metrics: PTZMetrics,
@ -503,7 +502,7 @@ def track_camera(
ptz_metrics=ptz_metrics, ptz_metrics=ptz_metrics,
) )
object_detector = RemoteObjectDetector( object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event name, labelmap, detection_queue, model_config, stop_event
) )
object_tracker = NorfairTracker(config, ptz_metrics) object_tracker = NorfairTracker(config, ptz_metrics)
@ -597,6 +596,7 @@ def process_frames(
): ):
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
None,
{camera_name: camera_config}, {camera_name: camera_config},
[ [
CameraConfigUpdateEnum.detect, CameraConfigUpdateEnum.detect,