From bd2cc18260393448873b86d75ecc00574bf954a5 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 12 Jun 2025 10:42:10 -0600 Subject: [PATCH] Move to using process for logging --- frigate/__main__.py | 5 +- frigate/app.py | 12 ++- frigate/camera/maintainer.py | 2 +- frigate/events/audio.py | 6 +- frigate/log.py | 13 +-- frigate/object_detection/base.py | 136 +++++++++++++++---------------- frigate/stats/util.py | 2 +- frigate/util/process.py | 21 ++++- 8 files changed, 102 insertions(+), 95 deletions(-) diff --git a/frigate/__main__.py b/frigate/__main__.py index 747a7f8bd..6dd5d130e 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -16,10 +16,11 @@ from frigate.util.config import find_config_file def main() -> None: + manager = mp.Manager() faulthandler.enable() # Setup the logging thread - setup_logging() + setup_logging(manager) threading.current_thread().name = "frigate" @@ -109,7 +110,7 @@ def main() -> None: sys.exit(0) # Run the main application. - FrigateApp(config).start() + FrigateApp(config, manager).start() if __name__ == "__main__": diff --git a/frigate/app.py b/frigate/app.py index 649684aa5..2070d2886 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,7 +5,7 @@ import os import secrets import shutil from multiprocessing import Queue -from multiprocessing.managers import DictProxy +from multiprocessing.managers import DictProxy, SyncManager from multiprocessing.synchronize import Event as MpEvent from typing import Optional @@ -80,15 +80,15 @@ logger = logging.getLogger(__name__) class FrigateApp: - def __init__(self, config: FrigateConfig) -> None: + def __init__(self, config: FrigateConfig, manager: SyncManager) -> None: + self.metrics_manager = manager self.audio_process: Optional[mp.Process] = None self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() - self.metrics_manager = mp.Manager() - self.camera_metrics: DictProxy[str, CameraMetrics] = self.metrics_manager.dict() + self.camera_metrics: DictProxy = self.metrics_manager.dict() self.embeddings_metrics: DataProcessorMetrics | None = ( DataProcessorMetrics( self.metrics_manager, list(config.classification.custom.keys()) @@ -658,7 +658,6 @@ class FrigateApp: self.stats_emitter.join() self.frigate_watchdog.join() self.db.stop() - self.metrics_manager.shutdown() # Save embeddings stats to disk if self.embeddings: @@ -676,7 +675,6 @@ class FrigateApp: shm.close() shm.unlink() - # exit the mp Manager process _stop_logging() - + self.metrics_manager.shutdown() os._exit(os.EX_OK) diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index e8304a571..fd09e076a 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -33,7 +33,7 @@ class CameraMaintainer(threading.Thread): config: FrigateConfig, detection_queue: Queue, detected_frames_queue: Queue, - camera_metrics: DictProxy[str, CameraMetrics], + camera_metrics: DictProxy, ptz_metrics: dict[str, PTZMetrics], stop_event: MpEvent, ): diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 7843cd3ae..6999caffb 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -13,6 +13,7 @@ import numpy as np import frigate.util as util from frigate.camera import CameraMetrics +from logging.handlers import QueueHandler from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, @@ -84,7 +85,7 @@ class AudioProcessor(util.Process): self, config: FrigateConfig, cameras: list[CameraConfig], - camera_metrics: DictProxy[str, CameraMetrics], + camera_metrics: DictProxy, ): super().__init__(name="frigate.audio_manager", daemon=True) @@ -106,6 +107,7 @@ class AudioProcessor(util.Process): self.transcription_model_runner = None def run(self) -> None: + self.pre_run_setup() audio_threads: list[AudioEventMaintainer] = [] threading.current_thread().name = "process:audio_manager" @@ -147,7 +149,7 @@ class AudioEventMaintainer(threading.Thread): self, camera: CameraConfig, config: FrigateConfig, - camera_metrics: DictProxy[str, CameraMetrics], + camera_metrics: DictProxy, audio_transcription_model_runner: AudioTranscriptionModelRunner | None, stop_event: threading.Event, ) -> None: diff --git a/frigate/log.py b/frigate/log.py index 096b52215..f535a278c 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -1,12 +1,12 @@ # In log.py import atexit import logging -import multiprocessing as mp import os import sys import threading from collections import deque from logging.handlers import QueueHandler, QueueListener +from multiprocessing.managers import SyncManager from queue import Queue from typing import Deque, Optional @@ -35,12 +35,10 @@ LOG_HANDLER.addFilter( log_listener: Optional[QueueListener] = None log_queue: Optional[Queue] = None -manager = None -def setup_logging() -> None: - global log_listener, log_queue, manager - manager = mp.Manager() +def setup_logging(manager: SyncManager) -> None: + global log_listener, log_queue log_queue = manager.Queue() log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) @@ -57,13 +55,10 @@ def setup_logging() -> None: def _stop_logging() -> None: - global log_listener, manager + global log_listener if log_listener is not None: log_listener.stop() log_listener = None - if manager is not None: - manager.shutdown() - manager = None # When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index e50e0bfc0..d203e8574 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -1,16 +1,11 @@ import datetime import logging -import multiprocessing as mp -import os import queue -import signal -import threading from abc import ABC, abstractmethod from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent import numpy as np -from setproctitle import setproctitle import frigate.util as util from frigate.comms.object_detector_signaler import ( @@ -25,7 +20,6 @@ from frigate.detectors.detector_config import ( ) from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory -from frigate.util.services import listen from .util import tensor_transform @@ -90,72 +84,75 @@ class LocalObjectDetector(ObjectDetector): return self.detect_api.detect_raw(tensor_input=tensor_input) -def run_detector( - name: str, - detection_queue: Queue, - cameras: list[str], - avg_speed: Value, - start: Value, - detector_config: BaseDetectorConfig, -): - threading.current_thread().name = f"detector:{name}" - logger = logging.getLogger(f"detector.{name}") - logger.info(f"Starting detection process: {os.getpid()}") - setproctitle(f"frigate.detector.{name}") - listen() +class DetectorRunner(util.Process): + def __init__( + self, + name, + detection_queue: Queue, + cameras: list[str], + avg_speed: Value, + start_time: Value, + detector_config: BaseDetectorConfig, + ) -> None: + super().__init__(name=name, daemon=True) + self.detection_queue = detection_queue + self.cameras = cameras + self.avg_speed = avg_speed + self.start_time = start_time + self.detector_config = detector_config + self.outputs: dict = {} - stop_event: MpEvent = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - def create_output_shm(name: str): + def create_output_shm(self, name: str): out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} + self.outputs[name] = {"shm": out_shm, "np": out_np} - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector(detector_config=detector_config) - detector_publisher = ObjectDetectorPublisher() + def run(self) -> None: + self.pre_run_setup() - outputs = {} - for name in cameras: - create_output_shm(name) + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector(detector_config=self.detector_config) + detector_publisher = ObjectDetectorPublisher() - while not stop_event.is_set(): - try: - connection_id = detection_queue.get(timeout=1) - except queue.Empty: - continue - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) + for name in self.cameras: + self.create_output_shm(name) - if input_frame is None: - logger.warning(f"Failed to get frame {connection_id} from SHM") - continue + while not self.stop_event.is_set(): + try: + connection_id = self.detection_queue.get(timeout=1) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, + ( + 1, + self.detector_config.model.height, + self.detector_config.model.width, + 3, + ), + ) - # detect and send the output - start.value = datetime.datetime.now().timestamp() - detections = object_detector.detect_raw(input_frame) - duration = datetime.datetime.now().timestamp() - start.value - frame_manager.close(connection_id) + if input_frame is None: + logger.warning(f"Failed to get frame {connection_id} from SHM") + continue - if connection_id not in outputs: - create_output_shm(connection_id) + # detect and send the output + self.start_time.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - self.start_time.value + frame_manager.close(connection_id) - outputs[connection_id]["np"][:] = detections[:] - detector_publisher.publish(connection_id) - start.value = 0.0 + if connection_id not in self.outputs: + self.create_output_shm(connection_id) - avg_speed.value = (avg_speed.value * 9 + duration) / 10 + self.outputs[connection_id]["np"][:] = detections[:] + detector_publisher.publish(connection_id) + self.start_time.value = 0.0 - detector_publisher.stop() - logger.info("Exited detection process...") + self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10 + + detector_publisher.stop() + logger.info("Exited detection process...") class ObjectDetectProcess: @@ -192,19 +189,14 @@ class ObjectDetectProcess: self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() - self.detect_process = util.Process( - target=run_detector, - name=f"detector:{self.name}", - args=( - self.name, - self.detection_queue, - self.cameras, - self.avg_inference_speed, - self.detection_start, - self.detector_config, - ), + self.detect_process = DetectorRunner( + f"detector:{self.name}", + self.detection_queue, + self.cameras, + self.avg_inference_speed, + self.detection_start, + self.detector_config, ) - self.detect_process.daemon = True self.detect_process.start() diff --git a/frigate/stats/util.py b/frigate/stats/util.py index 9a24813a3..01892cbd4 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -54,7 +54,7 @@ def get_latest_version(config: FrigateConfig) -> str: def stats_init( config: FrigateConfig, - camera_metrics: DictProxy[str, CameraMetrics], + camera_metrics: DictProxy, embeddings_metrics: DataProcessorMetrics | None, detectors: dict[str, ObjectDetectProcess], processes: dict[str, int], diff --git a/frigate/util/process.py b/frigate/util/process.py index 84980eee0..3501e585e 100644 --- a/frigate/util/process.py +++ b/frigate/util/process.py @@ -1,5 +1,8 @@ +import faulthandler import logging import multiprocessing as mp +import signal +import sys import threading from logging.handlers import QueueHandler from typing import Callable, Optional @@ -45,7 +48,23 @@ class Process(BaseProcess): return self.__dict__["stop_event"] def before_start(self) -> None: - self.__log_queue = frigate.log.log_queue + self.__log_queue = frigate.log.log_listener.queue + + def pre_run_setup(self) -> None: + faulthandler.enable() + + def receiveSignal(signalNumber, frame): + # Get the stop_event through the dict to bypass lazy initialization. + stop_event = self.__dict__.get("stop_event") + if stop_event is not None: + # Someone is monitoring stop_event. We should set it. + stop_event.set() + else: + # Nobody is monitoring stop_event. We should raise SystemExit. + sys.exit() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) self.logger = logging.getLogger(self.name) logging.basicConfig(handlers=[], force=True) logging.getLogger().addHandler(QueueHandler(self.__log_queue))