Compare commits

..

No commits in common. "6ebbc7c21df9ea418cae13956e95e6a0aed7c107" and "acac743dea9328addb75d041f39926732a0ca4ac" have entirely different histories.

18 changed files with 582 additions and 513 deletions

View File

@ -1,6 +1,5 @@
import argparse
import faulthandler
import multiprocessing as mp
import signal
import sys
import threading
@ -16,11 +15,10 @@ from frigate.util.config import find_config_file
def main() -> None:
manager = mp.Manager()
faulthandler.enable()
# Setup the logging thread
setup_logging(manager)
setup_logging()
threading.current_thread().name = "frigate"
@ -110,9 +108,8 @@ def main() -> None:
sys.exit(0)
# Run the main application.
FrigateApp(config, manager).start()
FrigateApp(config).start()
if __name__ == "__main__":
mp.set_start_method("forkserver", force=True)
main()

View File

@ -5,7 +5,6 @@ import os
import secrets
import shutil
from multiprocessing import Queue
from multiprocessing.managers import DictProxy, SyncManager
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
@ -14,6 +13,7 @@ import uvicorn
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
import frigate.util as util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics
@ -41,11 +41,10 @@ from frigate.const import (
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingProcess, EmbeddingsContext
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.maintainer import EventProcessor
from frigate.log import _stop_logging
from frigate.models import (
Event,
Export,
@ -58,13 +57,13 @@ from frigate.models import (
User,
)
from frigate.object_detection.base import ObjectDetectProcess
from frigate.output.output import OutputProcess
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports
from frigate.record.record import RecordProcess
from frigate.review.review import ReviewProcess
from frigate.record.record import manage_recordings
from frigate.review.review import manage_review_segments
from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
@ -79,19 +78,16 @@ logger = logging.getLogger(__name__)
class FrigateApp:
def __init__(self, config: FrigateConfig, manager: SyncManager) -> None:
self.metrics_manager = manager
def __init__(self, config: FrigateConfig) -> None:
self.audio_process: Optional[mp.Process] = None
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: DictProxy = self.metrics_manager.dict()
self.camera_metrics: dict[str, CameraMetrics] = {}
self.embeddings_metrics: DataProcessorMetrics | None = (
DataProcessorMetrics(
self.metrics_manager, list(config.classification.custom.keys())
)
DataProcessorMetrics(list(config.classification.custom.keys()))
if (
config.semantic_search.enabled
or config.lpr.enabled
@ -129,7 +125,7 @@ class FrigateApp:
def init_camera_metrics(self) -> None:
# create camera_metrics
for camera_name in self.config.cameras.keys():
self.camera_metrics[camera_name] = CameraMetrics(self.metrics_manager)
self.camera_metrics[camera_name] = CameraMetrics()
self.ptz_metrics[camera_name] = PTZMetrics(
autotracker_enabled=self.config.cameras[
camera_name
@ -223,14 +219,24 @@ class FrigateApp:
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = RecordProcess(self.config)
recording_process = util.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config,),
)
recording_process.daemon = True
self.recording_process = recording_process
recording_process.start()
self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = ReviewProcess(self.config)
review_segment_process = util.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
)
review_segment_process.daemon = True
self.review_segment_process = review_segment_process
review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0
@ -249,10 +255,15 @@ class FrigateApp:
):
return
embedding_process = EmbeddingProcess(
self.config,
self.embeddings_metrics,
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(
self.config,
self.embeddings_metrics,
),
)
embedding_process.daemon = True
self.embedding_process = embedding_process
embedding_process.start()
self.processes["embeddings"] = embedding_process.pid or 0
@ -409,7 +420,12 @@ class FrigateApp:
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = OutputProcess(self.config)
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
)
output_processor.daemon = True
self.output_processor = output_processor
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")
@ -654,6 +670,4 @@ class FrigateApp:
shm.close()
shm.unlink()
_stop_logging()
self.metrics_manager.shutdown()
os._exit(os.EX_OK)

View File

@ -1,7 +1,7 @@
import multiprocessing as mp
from multiprocessing.managers import SyncManager
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
from typing import Optional
class CameraMetrics:
@ -16,25 +16,25 @@ class CameraMetrics:
frame_queue: mp.Queue
process_pid: Synchronized
capture_process_pid: Synchronized
process: Optional[mp.Process]
capture_process: Optional[mp.Process]
ffmpeg_pid: Synchronized
def __init__(self, manager: SyncManager):
self.camera_fps = manager.Value("d", 0)
self.detection_fps = manager.Value("d", 0)
self.detection_frame = manager.Value("d", 0)
self.process_fps = manager.Value("d", 0)
self.skipped_fps = manager.Value("d", 0)
self.read_start = manager.Value("d", 0)
self.audio_rms = manager.Value("d", 0)
self.audio_dBFS = manager.Value("d", 0)
def __init__(self):
self.camera_fps = mp.Value("d", 0)
self.detection_fps = mp.Value("d", 0)
self.detection_frame = mp.Value("d", 0)
self.process_fps = mp.Value("d", 0)
self.skipped_fps = mp.Value("d", 0)
self.read_start = mp.Value("d", 0)
self.audio_rms = mp.Value("d", 0)
self.audio_dBFS = mp.Value("d", 0)
self.frame_queue = manager.Queue(maxsize=2)
self.frame_queue = mp.Queue(maxsize=2)
self.process_pid = manager.Value("i", 0)
self.capture_process_pid = manager.Value("i", 0)
self.ffmpeg_pid = manager.Value("i", 0)
self.process = None
self.capture_process = None
self.ffmpeg_pid = mp.Value("i", 0)
class PTZMetrics:

View File

@ -1,12 +1,10 @@
"""Create and maintain camera processes / management."""
import logging
import multiprocessing as mp
import os
import shutil
import threading
from multiprocessing import Queue
from multiprocessing.managers import DictProxy
from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics
@ -18,10 +16,11 @@ from frigate.config.camera.updater import (
)
from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
from frigate.video import CameraCapture, CameraTracker
from frigate.video import capture_camera, track_camera
logger = logging.getLogger(__name__)
@ -32,7 +31,7 @@ class CameraMaintainer(threading.Thread):
config: FrigateConfig,
detection_queue: Queue,
detected_frames_queue: Queue,
camera_metrics: DictProxy,
camera_metrics: dict[str, CameraMetrics],
ptz_metrics: dict[str, PTZMetrics],
stop_event: MpEvent,
):
@ -54,8 +53,6 @@ class CameraMaintainer(threading.Thread):
],
)
self.shm_count = self.__calculate_shm_frame_count()
self.camera_processes: dict[str, mp.Process] = {}
self.capture_processes: dict[str, mp.Process] = {}
def __init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
@ -154,19 +151,24 @@ class CameraMaintainer(threading.Thread):
except FileExistsError:
pass
camera_process = CameraTracker(
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
camera_process = FrigateProcess(
target=track_camera,
name=f"camera_processor:{name}",
args=(
config.name,
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
self.camera_processes[config.name] = camera_process
self.camera_metrics[config.name].process = camera_process
camera_process.start()
self.camera_metrics[config.name].process_pid.value = camera_process.pid
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(
@ -182,28 +184,32 @@ class CameraMaintainer(threading.Thread):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = CameraCapture(config, count, self.camera_metrics[name])
capture_process = FrigateProcess(
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.capture_processes[name] = capture_process
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.capture_processes[camera]
capture_process = self.camera_metrics[camera].capture_process
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()
def __stop_camera_process(self, camera: str) -> None:
camera_process = self.camera_processes[camera]
metrics = self.camera_metrics[camera]
camera_process = metrics.process
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
empty_and_close_queue(metrics.frame_queue)
def run(self):
self.__init_historical_regions()
@ -234,11 +240,11 @@ class CameraMaintainer(threading.Thread):
self.__stop_camera_process(camera)
# ensure the capture processes are done
for camera in self.camera_processes.keys():
for camera in self.camera_metrics.keys():
self.__stop_camera_capture_process(camera)
# ensure the camera processors are done
for camera in self.capture_processes.keys():
for camera in self.camera_metrics.keys():
self.__stop_camera_process(camera)
self.update_subscriber.stop()

View File

@ -1,7 +1,7 @@
"""Embeddings types."""
import multiprocessing as mp
from enum import Enum
from multiprocessing.managers import SyncManager
from multiprocessing.sharedctypes import Synchronized
import sherpa_onnx
@ -20,27 +20,25 @@ class DataProcessorMetrics:
alpr_pps: Synchronized
yolov9_lpr_speed: Synchronized
yolov9_lpr_pps: Synchronized
classification_speeds: dict[str, Synchronized]
classification_cps: dict[str, Synchronized]
classification_speeds: dict[str, Synchronized] = {}
classification_cps: dict[str, Synchronized] = {}
def __init__(self, manager: SyncManager, custom_classification_models: list[str]):
self.image_embeddings_speed = manager.Value("d", 0.0)
self.image_embeddings_eps = manager.Value("d", 0.0)
self.text_embeddings_speed = manager.Value("d", 0.0)
self.text_embeddings_eps = manager.Value("d", 0.0)
self.face_rec_speed = manager.Value("d", 0.0)
self.face_rec_fps = manager.Value("d", 0.0)
self.alpr_speed = manager.Value("d", 0.0)
self.alpr_pps = manager.Value("d", 0.0)
self.yolov9_lpr_speed = manager.Value("d", 0.0)
self.yolov9_lpr_pps = manager.Value("d", 0.0)
self.classification_speeds = manager.dict()
self.classification_cps = manager.dict()
def __init__(self, custom_classification_models: list[str]):
self.image_embeddings_speed = mp.Value("d", 0.0)
self.image_embeddings_eps = mp.Value("d", 0.0)
self.text_embeddings_speed = mp.Value("d", 0.0)
self.text_embeddings_eps = mp.Value("d", 0.0)
self.face_rec_speed = mp.Value("d", 0.0)
self.face_rec_fps = mp.Value("d", 0.0)
self.alpr_speed = mp.Value("d", 0.0)
self.alpr_pps = mp.Value("d", 0.0)
self.yolov9_lpr_speed = mp.Value("d", 0.0)
self.yolov9_lpr_pps = mp.Value("d", 0.0)
if custom_classification_models:
for key in custom_classification_models:
self.classification_speeds[key] = manager.Value("d", 0.0)
self.classification_cps[key] = manager.Value("d", 0.0)
self.classification_speeds[key] = mp.Value("d", 0.0)
self.classification_cps[key] = mp.Value("d", 0.0)
class DataProcessorModelRunner:

View File

@ -3,22 +3,26 @@
import base64
import json
import logging
import multiprocessing as mp
import os
import signal
import threading
from typing import Any, Union
from types import FrameType
from typing import Any, Optional, Union
import regex
from pathvalidate import ValidationError, sanitize_filename
from setproctitle import setproctitle
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR, FACE_DIR
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event
from frigate.util import Process as FrigateProcess
from frigate.models import Event, Recordings
from frigate.util.builtin import serialize
from frigate.util.classification import kickoff_model_training
from frigate.util.services import listen
from .maintainer import EmbeddingMaintainer
from .util import ZScoreNormalization
@ -26,22 +30,40 @@ from .util import ZScoreNormalization
logger = logging.getLogger(__name__)
class EmbeddingProcess(FrigateProcess):
def __init__(
self, config: FrigateConfig, metrics: DataProcessorMetrics | None
) -> None:
super().__init__(name="frigate.embeddings_manager", daemon=True)
self.config = config
self.metrics = metrics
def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None:
stop_event = mp.Event()
def run(self) -> None:
self.pre_run_setup()
maintainer = EmbeddingMaintainer(
self.config,
self.metrics,
self.stop_event,
)
maintainer.start()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:embeddings_manager"
setproctitle("frigate.embeddings_manager")
listen()
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
maintainer = EmbeddingMaintainer(
db,
config,
metrics,
stop_event,
)
maintainer.start()
class EmbeddingsContext:

View File

@ -12,6 +12,7 @@ from typing import Any, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
@ -57,10 +58,9 @@ from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor,
)
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client
from frigate.models import Event, Recordings
from frigate.models import Event
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import (
@ -82,8 +82,9 @@ class EmbeddingMaintainer(threading.Thread):
def __init__(
self,
db: SqliteQueueDatabase,
config: FrigateConfig,
metrics: DataProcessorMetrics | None,
metrics: DataProcessorMetrics,
stop_event: MpEvent,
) -> None:
super().__init__(name="embeddings_maintainer")
@ -96,22 +97,6 @@ class EmbeddingMaintainer(threading.Thread):
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
)
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in config.cameras.values() if c.enabled])
),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)

View File

@ -6,12 +6,12 @@ import random
import string
import threading
import time
from multiprocessing.managers import DictProxy
from typing import Any, Tuple
import numpy as np
import frigate.util as util
from frigate.camera import CameraMetrics
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
@ -83,7 +83,7 @@ class AudioProcessor(util.Process):
self,
config: FrigateConfig,
cameras: list[CameraConfig],
camera_metrics: DictProxy,
camera_metrics: dict[str, CameraMetrics],
):
super().__init__(name="frigate.audio_manager", daemon=True)
@ -105,7 +105,6 @@ class AudioProcessor(util.Process):
self.transcription_model_runner = None
def run(self) -> None:
self.pre_run_setup()
audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager"
@ -147,7 +146,7 @@ class AudioEventMaintainer(threading.Thread):
self,
camera: CameraConfig,
config: FrigateConfig,
camera_metrics: DictProxy,
camera_metrics: dict[str, CameraMetrics],
audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event,
) -> None:

View File

@ -1,13 +1,11 @@
# In log.py
import atexit
import logging
import multiprocessing as mp
import os
import sys
import threading
from collections import deque
from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from queue import Queue
from typing import Deque, Optional
from frigate.util.builtin import clean_camera_user_pass
@ -34,12 +32,12 @@ LOG_HANDLER.addFilter(
)
log_listener: Optional[QueueListener] = None
log_queue: Optional[Queue] = None
def setup_logging(manager: SyncManager) -> None:
global log_listener, log_queue
log_queue = manager.Queue()
def setup_logging() -> None:
global log_listener
log_queue: mp.Queue = mp.Queue()
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
atexit.register(_stop_logging)
@ -56,6 +54,7 @@ def setup_logging(manager: SyncManager) -> None:
def _stop_logging() -> None:
global log_listener
if log_listener is not None:
log_listener.stop()
log_listener = None

View File

@ -1,11 +1,16 @@
import datetime
import logging
import multiprocessing as mp
import os
import queue
import signal
import threading
from abc import ABC, abstractmethod
from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent
import numpy as np
from setproctitle import setproctitle
import frigate.util as util
from frigate.comms.object_detector_signaler import (
@ -20,6 +25,7 @@ from frigate.detectors.detector_config import (
)
from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.services import listen
from .util import tensor_transform
@ -84,75 +90,72 @@ class LocalObjectDetector(ObjectDetector):
return self.detect_api.detect_raw(tensor_input=tensor_input)
class DetectorRunner(util.Process):
def __init__(
self,
name,
detection_queue: Queue,
cameras: list[str],
avg_speed: Value,
start_time: Value,
detector_config: BaseDetectorConfig,
) -> None:
super().__init__(name=name, daemon=True)
self.detection_queue = detection_queue
self.cameras = cameras
self.avg_speed = avg_speed
self.start_time = start_time
self.detector_config = detector_config
self.outputs: dict = {}
def run_detector(
name: str,
detection_queue: Queue,
cameras: list[str],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
):
threading.current_thread().name = f"detector:{name}"
logger = logging.getLogger(f"detector.{name}")
logger.info(f"Starting detection process: {os.getpid()}")
setproctitle(f"frigate.detector.{name}")
listen()
def create_output_shm(self, name: str):
stop_event: MpEvent = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
def create_output_shm(name: str):
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
self.outputs[name] = {"shm": out_shm, "np": out_np}
outputs[name] = {"shm": out_shm, "np": out_np}
def run(self) -> None:
self.pre_run_setup()
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
detector_publisher = ObjectDetectorPublisher()
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=self.detector_config)
detector_publisher = ObjectDetectorPublisher()
outputs = {}
for name in cameras:
create_output_shm(name)
for name in self.cameras:
self.create_output_shm(name)
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
)
while not self.stop_event.is_set():
try:
connection_id = self.detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
3,
),
)
if input_frame is None:
logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
if input_frame is None:
logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
# detect and send the output
start.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
# detect and send the output
self.start_time.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - self.start_time.value
frame_manager.close(connection_id)
if connection_id not in outputs:
create_output_shm(connection_id)
if connection_id not in self.outputs:
self.create_output_shm(connection_id)
outputs[connection_id]["np"][:] = detections[:]
detector_publisher.publish(connection_id)
start.value = 0.0
self.outputs[connection_id]["np"][:] = detections[:]
detector_publisher.publish(connection_id)
self.start_time.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10
self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10
detector_publisher.stop()
logger.info("Exited detection process...")
detector_publisher.stop()
logger.info("Exited detection process...")
class ObjectDetectProcess:
@ -189,14 +192,19 @@ class ObjectDetectProcess:
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
self.detect_process = DetectorRunner(
f"detector:{self.name}",
self.detection_queue,
self.cameras,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
self.detect_process = util.Process(
target=run_detector,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.cameras,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
)
self.detect_process.daemon = True
self.detect_process.start()

View File

@ -2,11 +2,14 @@
import datetime
import logging
import multiprocessing as mp
import os
import shutil
import signal
import threading
from wsgiref.simple_server import make_server
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
@ -14,7 +17,6 @@ from ws4py.server.wsgirefserver import (
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
import frigate.util as util
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig
@ -71,193 +73,189 @@ def check_disabled_camera_update(
birdseye.all_cameras_disabled()
class OutputProcess(util.Process):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.output", daemon=True)
self.config = config
def output_frames(
config: FrigateConfig,
):
threading.current_thread().name = "output"
setproctitle("frigate.output")
def run(self) -> None:
self.pre_run_setup()
stop_event = mp.Event()
frame_manager = SharedMemoryFrameManager()
def receiveSignal(signalNumber, frame):
stop_event.set()
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
frame_manager = SharedMemoryFrameManager()
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Birdseye | None = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
failed_frame_requests: dict[str, int] = {}
last_disabled_cam_check = datetime.datetime.now().timestamp()
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
move_preview_frames("cache")
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
for camera, cam_config in self.config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Birdseye | None = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
failed_frame_requests: dict[str, int] = {}
last_disabled_cam_check = datetime.datetime.now().timestamp()
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
move_preview_frames("cache")
for camera, cam_config in config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if config.birdseye.enabled:
birdseye = Birdseye(config, stop_event, websocket_server)
websocket_thread.start()
while not stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
config, birdseye, preview_recorders, preview_write_times
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if self.config.birdseye.enabled:
birdseye = Birdseye(self.config, self.stop_event, websocket_server)
if not topic:
continue
websocket_thread.start()
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
_,
) = data
while not self.stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if not config.cameras[camera].enabled:
continue
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
self.config, birdseye, preview_recorders, preview_write_times
if failed_frame_requests[camera] > config.cameras[camera].detect.fps:
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
if not topic:
continue
continue
else:
failed_frame_requests[camera] = 0
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
_,
) = data
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
if not self.config.cameras[camera].enabled:
continue
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
)
if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
if (
failed_frame_requests[camera]
> self.config.cameras[camera].detect.fps
):
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
continue
else:
failed_frame_requests[camera] = 0
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera)
# send output data to birdseye if websocket is connected or restreaming
if config.birdseye.enabled and (
config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
birdseye.write_data(
camera,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
)
frame_manager.close(frame_name)
move_preview_frames("clips")
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
if not topic:
break
(
)
):
birdseye.write_data(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
frame_time,
frame,
)
frame_manager.close(frame_name)
detection_subscriber.stop()
frame_manager.close(frame_name)
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
move_preview_frames("clips")
for preview in preview_recorders.values():
preview.stop()
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
if birdseye is not None:
birdseye.stop()
if not topic:
break
config_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
frame_manager.close(frame_name)
detection_subscriber.stop()
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
for preview in preview_recorders.values():
preview.stop()
if birdseye is not None:
birdseye.stop()
config_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
def move_preview_frames(loc: str):

View File

@ -1,40 +1,50 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.models import Recordings, ReviewSegment
from frigate.record.maintainer import RecordingMaintainer
from frigate.util import Process as FrigateProcess
from frigate.util.services import listen
logger = logging.getLogger(__name__)
class RecordProcess(FrigateProcess):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.recording_manager", daemon=True)
self.config = config
def manage_recordings(config: FrigateConfig) -> None:
stop_event = mp.Event()
def run(self) -> None:
self.pre_run_setup()
db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
)
models = [ReviewSegment, Recordings]
db.bind(models)
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
maintainer = RecordingMaintainer(
self.config,
self.stop_event,
)
maintainer.start()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:recording_manager"
setproctitle("frigate.recording_manager")
listen()
db = SqliteQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
)
models = [ReviewSegment, Recordings]
db.bind(models)
maintainer = RecordingMaintainer(
config,
stop_event,
)
maintainer.start()

View File

@ -1,23 +1,36 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from setproctitle import setproctitle
import frigate.util as util
from frigate.config import FrigateConfig
from frigate.review.maintainer import ReviewSegmentMaintainer
from frigate.util.services import listen
logger = logging.getLogger(__name__)
class ReviewProcess(util.Process):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.review_segment_manager", daemon=True)
self.config = config
def manage_review_segments(config: FrigateConfig) -> None:
stop_event = mp.Event()
def run(self) -> None:
self.pre_run_setup()
maintainer = ReviewSegmentMaintainer(
self.config,
self.stop_event,
)
maintainer.start()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:review_segment_manager"
setproctitle("frigate.review_segment_manager")
listen()
maintainer = ReviewSegmentMaintainer(
config,
stop_event,
)
maintainer.start()

View File

@ -5,13 +5,13 @@ import os
import shutil
import time
from json import JSONDecodeError
from multiprocessing.managers import DictProxy
from typing import Any, Optional
import psutil
import requests
from requests.exceptions import RequestException
from frigate.camera import CameraMetrics
from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.data_processing.types import DataProcessorMetrics
@ -53,7 +53,7 @@ def get_latest_version(config: FrigateConfig) -> str:
def stats_init(
config: FrigateConfig,
camera_metrics: DictProxy,
camera_metrics: dict[str, CameraMetrics],
embeddings_metrics: DataProcessorMetrics | None,
detectors: dict[str, ObjectDetectProcess],
processes: dict[str, int],
@ -271,12 +271,10 @@ def stats_snapshot(
stats["cameras"] = {}
for name, camera_stats in camera_metrics.items():
total_detection_fps += camera_stats.detection_fps.value
pid = camera_stats.process_pid.value if camera_stats.process_pid.value else None
pid = camera_stats.process.pid if camera_stats.process else None
ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None
capture_pid = (
camera_stats.capture_process_pid.value
if camera_stats.capture_process_pid.value
else None
camera_stats.capture_process.pid if camera_stats.capture_process else None
)
stats["cameras"][name] = {
"camera_fps": round(camera_stats.camera_fps.value, 2),

View File

@ -341,14 +341,11 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
def empty_and_close_queue(q: mp.Queue):
while True:
try:
try:
q.get(block=True, timeout=0.5)
except (queue.Empty, EOFError):
q.close()
q.join_thread()
return
except AttributeError:
pass
q.get(block=True, timeout=0.5)
except queue.Empty:
q.close()
q.join_thread()
return
def generate_color_palette(n):

View File

@ -4,8 +4,9 @@ import multiprocessing as mp
import signal
import sys
import threading
from functools import wraps
from logging.handlers import QueueHandler
from typing import Callable, Optional
from typing import Any, Callable, Optional
import frigate.log
@ -29,12 +30,34 @@ class BaseProcess(mp.Process):
super().start(*args, **kwargs)
self.after_start()
def __getattribute__(self, name: str) -> Any:
if name == "run":
run = super().__getattribute__("run")
@wraps(run)
def run_wrapper(*args, **kwargs):
try:
self.before_run()
return run(*args, **kwargs)
finally:
self.after_run()
return run_wrapper
return super().__getattribute__(name)
def before_start(self) -> None:
pass
def after_start(self) -> None:
pass
def before_run(self) -> None:
pass
def after_run(self) -> None:
pass
class Process(BaseProcess):
logger: logging.Logger
@ -50,7 +73,7 @@ class Process(BaseProcess):
def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue
def pre_run_setup(self) -> None:
def before_run(self) -> None:
faulthandler.enable()
def receiveSignal(signalNumber, frame):
@ -65,6 +88,8 @@ class Process(BaseProcess):
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))

View File

@ -1,7 +1,9 @@
import datetime
import logging
import multiprocessing as mp
import os
import queue
import signal
import subprocess as sp
import threading
import time
@ -10,8 +12,8 @@ from multiprocessing.synchronize import Event as MpEvent
from typing import Any
import cv2
from setproctitle import setproctitle
import frigate.util as util
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, ModelConfig
@ -51,6 +53,7 @@ from frigate.util.object import (
is_object_filtered,
reduce_detections,
)
from frigate.util.services import listen
logger = logging.getLogger(__name__)
@ -315,7 +318,7 @@ class CameraWatchdog(threading.Thread):
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCaptureRunner(
self.capture_thread = CameraCapture(
self.config,
self.shm_frame_count,
self.frame_index,
@ -393,7 +396,7 @@ class CameraWatchdog(threading.Thread):
return newest_segment_time
class CameraCaptureRunner(threading.Thread):
class CameraCapture(threading.Thread):
def __init__(
self,
config: CameraConfig,
@ -437,103 +440,103 @@ class CameraCaptureRunner(threading.Thread):
)
class CameraCapture(util.Process):
def __init__(
self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
) -> None:
super().__init__(name=f"camera_capture:{config.name}", daemon=True)
self.config = config
self.shm_frame_count = shm_frame_count
self.camera_metrics = camera_metrics
def capture_camera(
config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
):
stop_event = mp.Event()
def run(self) -> None:
self.pre_run_setup()
camera_watchdog = CameraWatchdog(
self.config,
self.shm_frame_count,
self.camera_metrics.frame_queue,
self.camera_metrics.camera_fps,
self.camera_metrics.skipped_fps,
self.camera_metrics.ffmpeg_pid,
self.stop_event,
)
camera_watchdog.start()
camera_watchdog.join()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"capture:{config.name}"
setproctitle(f"frigate.capture:{config.name}")
camera_watchdog = CameraWatchdog(
config,
shm_frame_count,
camera_metrics.frame_queue,
camera_metrics.camera_fps,
camera_metrics.skipped_fps,
camera_metrics.ffmpeg_pid,
stop_event,
)
camera_watchdog.start()
camera_watchdog.join()
class CameraTracker(util.Process):
def __init__(
self,
config: CameraConfig,
model_config: ModelConfig,
labelmap: dict[int, str],
detection_queue: Queue,
def track_camera(
name,
config: CameraConfig,
model_config: ModelConfig,
labelmap: dict[int, str],
detection_queue: Queue,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
region_grid: list[list[dict[str, Any]]],
):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"process:{name}"
setproctitle(f"frigate.process:{name}")
listen()
frame_queue = camera_metrics.frame_queue
frame_shape = config.frame_shape
motion_detector = ImprovedMotionDetector(
frame_shape,
config.motion,
config.detect.fps,
name=config.name,
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
name,
requestor,
frame_queue,
frame_shape,
model_config,
config,
frame_manager,
motion_detector,
object_detector,
object_tracker,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
region_grid: list[list[dict[str, Any]]],
) -> None:
super().__init__(name=f"camera_processor:{config.name}", daemon=True)
self.config = config
self.model_config = model_config
self.labelmap = labelmap
self.detection_queue = detection_queue
self.detected_objects_queue = detected_objects_queue
self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics
self.region_grid = region_grid
camera_metrics,
stop_event,
ptz_metrics,
region_grid,
)
def run(self) -> None:
self.pre_run_setup()
frame_queue = self.camera_metrics.frame_queue
frame_shape = self.config.frame_shape
# empty the frame queue
logger.info(f"{name}: emptying frame queue")
while not frame_queue.empty():
(frame_name, _) = frame_queue.get(False)
frame_manager.delete(frame_name)
motion_detector = ImprovedMotionDetector(
frame_shape,
self.config.motion,
self.config.detect.fps,
name=self.config.name,
ptz_metrics=self.ptz_metrics,
)
object_detector = RemoteObjectDetector(
self.config.name,
self.labelmap,
self.detection_queue,
self.model_config,
self.stop_event,
)
object_tracker = NorfairTracker(self.config, self.ptz_metrics)
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
requestor,
frame_queue,
frame_shape,
self.model_config,
self.config,
frame_manager,
motion_detector,
object_detector,
object_tracker,
self.detected_objects_queue,
self.camera_metrics,
self.stop_event,
self.ptz_metrics,
self.region_grid,
)
# empty the frame queue
logger.info(f"{self.config.name}: emptying frame queue")
while not frame_queue.empty():
(frame_name, _) = frame_queue.get(False)
frame_manager.delete(frame_name)
logger.info(f"{self.config.name}: exiting subprocess")
logger.info(f"{name}: exiting subprocess")
def detect(
@ -574,6 +577,7 @@ def detect(
def process_frames(
camera_name: str,
requestor: InterProcessRequestor,
frame_queue: Queue,
frame_shape: tuple[int, int],
@ -593,7 +597,7 @@ def process_frames(
next_region_update = get_tomorrow_at_time(2)
config_subscriber = CameraConfigUpdateSubscriber(
None,
{camera_config.name: camera_config},
{camera_name: camera_config},
[
CameraConfigUpdateEnum.detect,
CameraConfigUpdateEnum.enabled,
@ -649,9 +653,7 @@ def process_frames(
and prev_enabled != camera_enabled
and camera_metrics.frame_queue.empty()
):
logger.debug(
f"Camera {camera_config.name} disabled, clearing tracked objects"
)
logger.debug(f"Camera {camera_name} disabled, clearing tracked objects")
prev_enabled = camera_enabled
# Clear norfair's dictionaries
@ -676,7 +678,7 @@ def process_frames(
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name)
next_region_update = get_tomorrow_at_time(2)
try:
@ -696,9 +698,7 @@ def process_frames(
frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
if frame is None:
logger.debug(
f"{camera_config.name}: frame {frame_time} is not in memory store."
)
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.")
continue
# look for motion if enabled
@ -937,7 +937,7 @@ def process_frames(
)
cv2.imwrite(
f"debug/frames/{camera_config.name}-{'{:.6f}'.format(frame_time)}.jpg",
f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg",
bgr_frame,
)
# add to the queue if not full
@ -949,7 +949,7 @@ def process_frames(
camera_metrics.process_fps.value = fps_tracker.eps()
detected_objects_queue.put(
(
camera_config.name,
camera_name,
frame_name,
frame_time,
detections,

View File

@ -173,7 +173,7 @@ export default function CameraMetrics({
});
series[key]["detect"].data.push({
x: statsIdx,
y: stats.cpu_usages[camStats.pid?.toString()]?.cpu,
y: stats.cpu_usages[camStats.pid.toString()].cpu,
});
});
});