From 5ef4f2d44008fa988571da341bde17b2f6279013 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 12 Jun 2025 11:52:14 -0600 Subject: [PATCH] Finalize process --- frigate/app.py | 30 +-- frigate/embeddings/__init__.py | 39 ++-- frigate/output/output.py | 330 +++++++++++++++++---------------- frigate/record/record.py | 60 +++--- 4 files changed, 214 insertions(+), 245 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 3b7e3ff9c..f2b231ad3 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -14,7 +14,6 @@ import uvicorn from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase -import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics @@ -42,7 +41,7 @@ from frigate.const import ( ) from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.embeddings import EmbeddingsContext, manage_embeddings +from frigate.embeddings import EmbeddingProcess, EmbeddingsContext from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.maintainer import EventProcessor @@ -59,12 +58,12 @@ from frigate.models import ( User, ) from frigate.object_detection.base import ObjectDetectProcess -from frigate.output.output import output_frames +from frigate.output.output import OutputProcess from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports -from frigate.record.record import manage_recordings +from frigate.record.record import RecordProcess from frigate.review.review import ReviewProcess from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init @@ -224,11 +223,7 @@ class FrigateApp: self.processes["go2rtc"] = proc.info["pid"] def init_recording_manager(self) -> None: - recording_process = util.Process( - target=manage_recordings, - name="recording_manager", - args=(self.config,), - ) + recording_process = RecordProcess(self.config) recording_process.daemon = True self.recording_process = recording_process recording_process.start() @@ -255,13 +250,9 @@ class FrigateApp: ): return - embedding_process = util.Process( - target=manage_embeddings, - name="embeddings_manager", - args=( - self.config, - self.embeddings_metrics, - ), + embedding_process = EmbeddingProcess( + self.config, + self.embeddings_metrics, ) embedding_process.daemon = True self.embedding_process = embedding_process @@ -420,12 +411,7 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = util.Process( - target=output_frames, - name="output_processor", - args=(self.config,), - ) - output_processor.daemon = True + output_processor = OutputProcess(self.config) self.output_processor = output_processor output_processor.start() logger.info(f"Output process started: {output_processor.pid}") diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 432ecbe02..fedf5f0cd 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -3,16 +3,12 @@ import base64 import json import logging -import multiprocessing as mp import os -import signal import threading -from types import FrameType -from typing import Any, Optional, Union +from typing import Any, Union import regex from pathvalidate import ValidationError, sanitize_filename -from setproctitle import setproctitle from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.config import FrigateConfig @@ -20,9 +16,9 @@ from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.models import Event +from frigate.util import Process as FrigateProcess from frigate.util.builtin import serialize from frigate.util.classification import kickoff_model_training -from frigate.util.services import listen from .maintainer import EmbeddingMaintainer from .util import ZScoreNormalization @@ -30,25 +26,20 @@ from .util import ZScoreNormalization logger = logging.getLogger(__name__) -def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None: - stop_event = mp.Event() +class EmbeddingProcess(FrigateProcess): + def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + super().__init__(name="frigate.embeddings_manager", daemon=True) + self.config = config + self.metrics = metrics - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:embeddings_manager" - setproctitle("frigate.embeddings_manager") - listen() - - maintainer = EmbeddingMaintainer( - config, - metrics, - stop_event, - ) - maintainer.start() + def run(self) -> None: + self.pre_run_setup() + maintainer = EmbeddingMaintainer( + self.config, + self.metrics, + self.stop_event, + ) + maintainer.start() class EmbeddingsContext: diff --git a/frigate/output/output.py b/frigate/output/output.py index d323596fe..8c60e51c7 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -2,14 +2,11 @@ import datetime import logging -import multiprocessing as mp import os import shutil -import signal import threading from wsgiref.simple_server import make_server -from setproctitle import setproctitle from ws4py.server.wsgirefserver import ( WebSocketWSGIHandler, WebSocketWSGIRequestHandler, @@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +import frigate.util as util from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig @@ -73,189 +71,193 @@ def check_disabled_camera_update( birdseye.all_cameras_disabled() -def output_frames( - config: FrigateConfig, -): - threading.current_thread().name = "output" - setproctitle("frigate.output") +class OutputProcess(util.Process): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.output", daemon=True) + self.config = config - stop_event = mp.Event() + def run(self) -> None: + self.pre_run_setup() - def receiveSignal(signalNumber, frame): - stop_event.set() + frame_manager = SharedMemoryFrameManager() - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - - # start a websocket server on 8082 - WebSocketWSGIHandler.http_version = "1.1" - websocket_server = make_server( - "127.0.0.1", - 8082, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocket), - ) - websocket_server.initialize_websockets_manager() - websocket_thread = threading.Thread(target=websocket_server.serve_forever) - - detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - config_subscriber = CameraConfigUpdateSubscriber( - config, - config.cameras, - [ - CameraConfigUpdateEnum.add, - CameraConfigUpdateEnum.birdseye, - CameraConfigUpdateEnum.enabled, - CameraConfigUpdateEnum.record, - ], - ) - - jsmpeg_cameras: dict[str, JsmpegCamera] = {} - birdseye: Birdseye | None = None - preview_recorders: dict[str, PreviewRecorder] = {} - preview_write_times: dict[str, float] = {} - failed_frame_requests: dict[str, int] = {} - last_disabled_cam_check = datetime.datetime.now().timestamp() - - move_preview_frames("cache") - - for camera, cam_config in config.cameras.items(): - if not cam_config.enabled_in_config: - continue - - jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - if config.birdseye.enabled: - birdseye = Birdseye(config, stop_event, websocket_server) - - websocket_thread.start() - - while not stop_event.is_set(): - # check if there is an updated config - updates = config_subscriber.check_for_updates() - - if "add" in updates: - for camera in updates["add"]: - jsmpeg_cameras[camera] = JsmpegCamera( - cam_config, stop_event, websocket_server - ) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - (topic, data) = detection_subscriber.check_for_update(timeout=1) - now = datetime.datetime.now().timestamp() - - if now - last_disabled_cam_check > 5: - # check disabled cameras every 5 seconds - last_disabled_cam_check = now - check_disabled_camera_update( - config, birdseye, preview_recorders, preview_write_times - ) - - if not topic: - continue - - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - _, - ) = data - - if not config.cameras[camera].enabled: - continue - - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - - if frame is None: - logger.debug(f"Failed to get frame {frame_name} from SHM") - failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - - if failed_frame_requests[camera] > config.cameras[camera].detect.fps: - logger.warning( - f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." - ) - - continue - else: - failed_frame_requests[camera] = 0 - - # send frames for low fps recording - preview_recorders[camera].write_data( - current_tracked_objects, motion_boxes, frame_time, frame + # start a websocket server on 8082 + WebSocketWSGIHandler.http_version = "1.1" + websocket_server = make_server( + "127.0.0.1", + 8082, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocket), ) - preview_write_times[camera] = frame_time + websocket_server.initialize_websockets_manager() + websocket_thread = threading.Thread(target=websocket_server.serve_forever) - # send camera frame to ffmpeg process if websockets are connected - if any( - ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager - ): - # write to the converter for the camera if clients are listening to the specific camera - jsmpeg_cameras[camera].write_frame(frame.tobytes()) + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) + config_subscriber = CameraConfigUpdateSubscriber( + self.config, + self.config.cameras, + [ + CameraConfigUpdateEnum.add, + CameraConfigUpdateEnum.birdseye, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + ], + ) - # send output data to birdseye if websocket is connected or restreaming - if config.birdseye.enabled and ( - config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager + jsmpeg_cameras: dict[str, JsmpegCamera] = {} + birdseye: Birdseye | None = None + preview_recorders: dict[str, PreviewRecorder] = {} + preview_write_times: dict[str, float] = {} + failed_frame_requests: dict[str, int] = {} + last_disabled_cam_check = datetime.datetime.now().timestamp() + + move_preview_frames("cache") + + for camera, cam_config in self.config.cameras.items(): + if not cam_config.enabled_in_config: + continue + + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server ) - ): - birdseye.write_data( + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + if self.config.birdseye.enabled: + birdseye = Birdseye(self.config, self.stop_event, websocket_server) + + websocket_thread.start() + + while not self.stop_event.is_set(): + # check if there is an updated config + updates = config_subscriber.check_for_updates() + + if "add" in updates: + for camera in updates["add"]: + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server + ) + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + (topic, data) = detection_subscriber.check_for_update(timeout=1) + now = datetime.datetime.now().timestamp() + + if now - last_disabled_cam_check > 5: + # check disabled cameras every 5 seconds + last_disabled_cam_check = now + check_disabled_camera_update( + self.config, birdseye, preview_recorders, preview_write_times + ) + + if not topic: + continue + + ( camera, + frame_name, + frame_time, current_tracked_objects, motion_boxes, - frame_time, - frame, + _, + ) = data + + if not self.config.cameras[camera].enabled: + continue + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv ) - frame_manager.close(frame_name) + if frame is None: + logger.debug(f"Failed to get frame {frame_name} from SHM") + failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - move_preview_frames("clips") + if ( + failed_frame_requests[camera] + > self.config.cameras[camera].detect.fps + ): + logger.warning( + f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." + ) - while True: - (topic, data) = detection_subscriber.check_for_update(timeout=0) + continue + else: + failed_frame_requests[camera] = 0 - if not topic: - break + # send frames for low fps recording + preview_recorders[camera].write_data( + current_tracked_objects, motion_boxes, frame_time, frame + ) + preview_write_times[camera] = frame_time - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data + # send camera frame to ffmpeg process if websockets are connected + if any( + ws.environ["PATH_INFO"].endswith(camera) + for ws in websocket_server.manager + ): + # write to the converter for the camera if clients are listening to the specific camera + jsmpeg_cameras[camera].write_frame(frame.tobytes()) - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - frame_manager.close(frame_name) + # send output data to birdseye if websocket is connected or restreaming + if self.config.birdseye.enabled and ( + self.config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) + ): + birdseye.write_data( + camera, + current_tracked_objects, + motion_boxes, + frame_time, + frame, + ) - detection_subscriber.stop() + frame_manager.close(frame_name) - for jsmpeg in jsmpeg_cameras.values(): - jsmpeg.stop() + move_preview_frames("clips") - for preview in preview_recorders.values(): - preview.stop() + while True: + (topic, data) = detection_subscriber.check_for_update(timeout=0) - if birdseye is not None: - birdseye.stop() + if not topic: + break - config_subscriber.stop() - websocket_server.manager.close_all() - websocket_server.manager.stop() - websocket_server.manager.join() - websocket_server.shutdown() - websocket_thread.join() - logger.info("exiting output process...") + ( + camera, + frame_name, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv + ) + frame_manager.close(frame_name) + + detection_subscriber.stop() + + for jsmpeg in jsmpeg_cameras.values(): + jsmpeg.stop() + + for preview in preview_recorders.values(): + preview.stop() + + if birdseye is not None: + birdseye.stop() + + config_subscriber.stop() + websocket_server.manager.close_all() + websocket_server.manager.stop() + websocket_server.manager.join() + websocket_server.shutdown() + websocket_thread.join() + logger.info("exiting output process...") def move_preview_frames(loc: str): diff --git a/frigate/record/record.py b/frigate/record/record.py index 252b80545..40a943a43 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -1,50 +1,40 @@ """Run recording maintainer and cleanup.""" import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional from playhouse.sqliteq import SqliteQueueDatabase -from setproctitle import setproctitle from frigate.config import FrigateConfig from frigate.models import Recordings, ReviewSegment from frigate.record.maintainer import RecordingMaintainer -from frigate.util.services import listen +from frigate.util import Process as FrigateProcess logger = logging.getLogger(__name__) -def manage_recordings(config: FrigateConfig) -> None: - stop_event = mp.Event() +class RecordProcess(FrigateProcess): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.recording_manager", daemon=True) + self.config = config - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() + def run(self) -> None: + self.pre_run_setup() + db = SqliteQueueDatabase( + self.config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) + ), + ) + models = [ReviewSegment, Recordings] + db.bind(models) - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:recording_manager" - setproctitle("frigate.recording_manager") - listen() - - db = SqliteQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - ) - models = [ReviewSegment, Recordings] - db.bind(models) - - maintainer = RecordingMaintainer( - config, - stop_event, - ) - maintainer.start() + maintainer = RecordingMaintainer( + self.config, + self.stop_event, + ) + maintainer.start()