mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-03 06:50:58 +00:00
Compare commits
18 Commits
acac743dea
...
6ebbc7c21d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ebbc7c21d | ||
|
|
11ac699266 | ||
|
|
f08afd57fe | ||
|
|
a25dcc29fb | ||
|
|
5ef4f2d440 | ||
|
|
e8dd382280 | ||
|
|
4b913953a1 | ||
|
|
bd2cc18260 | ||
|
|
9414a11df4 | ||
|
|
270d8f14c7 | ||
|
|
1ac655bc6d | ||
|
|
1c9eafb6fc | ||
|
|
1e74566d15 | ||
|
|
05a3115304 | ||
|
|
5533a6d310 | ||
|
|
036d1949e7 | ||
|
|
85dcfdd670 | ||
|
|
4dd5e9c66e |
@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
import faulthandler
|
||||
import multiprocessing as mp
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
@ -15,10 +16,11 @@ from frigate.util.config import find_config_file
|
||||
|
||||
|
||||
def main() -> None:
|
||||
manager = mp.Manager()
|
||||
faulthandler.enable()
|
||||
|
||||
# Setup the logging thread
|
||||
setup_logging()
|
||||
setup_logging(manager)
|
||||
|
||||
threading.current_thread().name = "frigate"
|
||||
|
||||
@ -108,8 +110,9 @@ def main() -> None:
|
||||
sys.exit(0)
|
||||
|
||||
# Run the main application.
|
||||
FrigateApp(config).start()
|
||||
FrigateApp(config, manager).start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mp.set_start_method("forkserver", force=True)
|
||||
main()
|
||||
|
||||
@ -5,6 +5,7 @@ import os
|
||||
import secrets
|
||||
import shutil
|
||||
from multiprocessing import Queue
|
||||
from multiprocessing.managers import DictProxy, SyncManager
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
from typing import Optional
|
||||
|
||||
@ -13,7 +14,6 @@ import uvicorn
|
||||
from peewee_migrate import Router
|
||||
from playhouse.sqlite_ext import SqliteExtDatabase
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.api.auth import hash_password
|
||||
from frigate.api.fastapi_app import create_fastapi_app
|
||||
from frigate.camera import CameraMetrics, PTZMetrics
|
||||
@ -41,10 +41,11 @@ from frigate.const import (
|
||||
)
|
||||
from frigate.data_processing.types import DataProcessorMetrics
|
||||
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
|
||||
from frigate.embeddings import EmbeddingsContext, manage_embeddings
|
||||
from frigate.embeddings import EmbeddingProcess, EmbeddingsContext
|
||||
from frigate.events.audio import AudioProcessor
|
||||
from frigate.events.cleanup import EventCleanup
|
||||
from frigate.events.maintainer import EventProcessor
|
||||
from frigate.log import _stop_logging
|
||||
from frigate.models import (
|
||||
Event,
|
||||
Export,
|
||||
@ -57,13 +58,13 @@ from frigate.models import (
|
||||
User,
|
||||
)
|
||||
from frigate.object_detection.base import ObjectDetectProcess
|
||||
from frigate.output.output import output_frames
|
||||
from frigate.output.output import OutputProcess
|
||||
from frigate.ptz.autotrack import PtzAutoTrackerThread
|
||||
from frigate.ptz.onvif import OnvifController
|
||||
from frigate.record.cleanup import RecordingCleanup
|
||||
from frigate.record.export import migrate_exports
|
||||
from frigate.record.record import manage_recordings
|
||||
from frigate.review.review import manage_review_segments
|
||||
from frigate.record.record import RecordProcess
|
||||
from frigate.review.review import ReviewProcess
|
||||
from frigate.stats.emitter import StatsEmitter
|
||||
from frigate.stats.util import stats_init
|
||||
from frigate.storage import StorageMaintainer
|
||||
@ -78,16 +79,19 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FrigateApp:
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
def __init__(self, config: FrigateConfig, manager: SyncManager) -> None:
|
||||
self.metrics_manager = manager
|
||||
self.audio_process: Optional[mp.Process] = None
|
||||
self.stop_event: MpEvent = mp.Event()
|
||||
self.detection_queue: Queue = mp.Queue()
|
||||
self.detectors: dict[str, ObjectDetectProcess] = {}
|
||||
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
|
||||
self.log_queue: Queue = mp.Queue()
|
||||
self.camera_metrics: dict[str, CameraMetrics] = {}
|
||||
self.camera_metrics: DictProxy = self.metrics_manager.dict()
|
||||
self.embeddings_metrics: DataProcessorMetrics | None = (
|
||||
DataProcessorMetrics(list(config.classification.custom.keys()))
|
||||
DataProcessorMetrics(
|
||||
self.metrics_manager, list(config.classification.custom.keys())
|
||||
)
|
||||
if (
|
||||
config.semantic_search.enabled
|
||||
or config.lpr.enabled
|
||||
@ -125,7 +129,7 @@ class FrigateApp:
|
||||
def init_camera_metrics(self) -> None:
|
||||
# create camera_metrics
|
||||
for camera_name in self.config.cameras.keys():
|
||||
self.camera_metrics[camera_name] = CameraMetrics()
|
||||
self.camera_metrics[camera_name] = CameraMetrics(self.metrics_manager)
|
||||
self.ptz_metrics[camera_name] = PTZMetrics(
|
||||
autotracker_enabled=self.config.cameras[
|
||||
camera_name
|
||||
@ -219,24 +223,14 @@ class FrigateApp:
|
||||
self.processes["go2rtc"] = proc.info["pid"]
|
||||
|
||||
def init_recording_manager(self) -> None:
|
||||
recording_process = util.Process(
|
||||
target=manage_recordings,
|
||||
name="recording_manager",
|
||||
args=(self.config,),
|
||||
)
|
||||
recording_process.daemon = True
|
||||
recording_process = RecordProcess(self.config)
|
||||
self.recording_process = recording_process
|
||||
recording_process.start()
|
||||
self.processes["recording"] = recording_process.pid or 0
|
||||
logger.info(f"Recording process started: {recording_process.pid}")
|
||||
|
||||
def init_review_segment_manager(self) -> None:
|
||||
review_segment_process = util.Process(
|
||||
target=manage_review_segments,
|
||||
name="review_segment_manager",
|
||||
args=(self.config,),
|
||||
)
|
||||
review_segment_process.daemon = True
|
||||
review_segment_process = ReviewProcess(self.config)
|
||||
self.review_segment_process = review_segment_process
|
||||
review_segment_process.start()
|
||||
self.processes["review_segment"] = review_segment_process.pid or 0
|
||||
@ -255,15 +249,10 @@ class FrigateApp:
|
||||
):
|
||||
return
|
||||
|
||||
embedding_process = util.Process(
|
||||
target=manage_embeddings,
|
||||
name="embeddings_manager",
|
||||
args=(
|
||||
self.config,
|
||||
self.embeddings_metrics,
|
||||
),
|
||||
embedding_process = EmbeddingProcess(
|
||||
self.config,
|
||||
self.embeddings_metrics,
|
||||
)
|
||||
embedding_process.daemon = True
|
||||
self.embedding_process = embedding_process
|
||||
embedding_process.start()
|
||||
self.processes["embeddings"] = embedding_process.pid or 0
|
||||
@ -420,12 +409,7 @@ class FrigateApp:
|
||||
self.detected_frames_processor.start()
|
||||
|
||||
def start_video_output_processor(self) -> None:
|
||||
output_processor = util.Process(
|
||||
target=output_frames,
|
||||
name="output_processor",
|
||||
args=(self.config,),
|
||||
)
|
||||
output_processor.daemon = True
|
||||
output_processor = OutputProcess(self.config)
|
||||
self.output_processor = output_processor
|
||||
output_processor.start()
|
||||
logger.info(f"Output process started: {output_processor.pid}")
|
||||
@ -670,4 +654,6 @@ class FrigateApp:
|
||||
shm.close()
|
||||
shm.unlink()
|
||||
|
||||
_stop_logging()
|
||||
self.metrics_manager.shutdown()
|
||||
os._exit(os.EX_OK)
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import multiprocessing as mp
|
||||
from multiprocessing.managers import SyncManager
|
||||
from multiprocessing.sharedctypes import Synchronized
|
||||
from multiprocessing.synchronize import Event
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class CameraMetrics:
|
||||
@ -16,25 +16,25 @@ class CameraMetrics:
|
||||
|
||||
frame_queue: mp.Queue
|
||||
|
||||
process: Optional[mp.Process]
|
||||
capture_process: Optional[mp.Process]
|
||||
process_pid: Synchronized
|
||||
capture_process_pid: Synchronized
|
||||
ffmpeg_pid: Synchronized
|
||||
|
||||
def __init__(self):
|
||||
self.camera_fps = mp.Value("d", 0)
|
||||
self.detection_fps = mp.Value("d", 0)
|
||||
self.detection_frame = mp.Value("d", 0)
|
||||
self.process_fps = mp.Value("d", 0)
|
||||
self.skipped_fps = mp.Value("d", 0)
|
||||
self.read_start = mp.Value("d", 0)
|
||||
self.audio_rms = mp.Value("d", 0)
|
||||
self.audio_dBFS = mp.Value("d", 0)
|
||||
def __init__(self, manager: SyncManager):
|
||||
self.camera_fps = manager.Value("d", 0)
|
||||
self.detection_fps = manager.Value("d", 0)
|
||||
self.detection_frame = manager.Value("d", 0)
|
||||
self.process_fps = manager.Value("d", 0)
|
||||
self.skipped_fps = manager.Value("d", 0)
|
||||
self.read_start = manager.Value("d", 0)
|
||||
self.audio_rms = manager.Value("d", 0)
|
||||
self.audio_dBFS = manager.Value("d", 0)
|
||||
|
||||
self.frame_queue = mp.Queue(maxsize=2)
|
||||
self.frame_queue = manager.Queue(maxsize=2)
|
||||
|
||||
self.process = None
|
||||
self.capture_process = None
|
||||
self.ffmpeg_pid = mp.Value("i", 0)
|
||||
self.process_pid = manager.Value("i", 0)
|
||||
self.capture_process_pid = manager.Value("i", 0)
|
||||
self.ffmpeg_pid = manager.Value("i", 0)
|
||||
|
||||
|
||||
class PTZMetrics:
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
"""Create and maintain camera processes / management."""
|
||||
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
from multiprocessing import Queue
|
||||
from multiprocessing.managers import DictProxy
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
|
||||
from frigate.camera import CameraMetrics, PTZMetrics
|
||||
@ -16,11 +18,10 @@ from frigate.config.camera.updater import (
|
||||
)
|
||||
from frigate.const import SHM_FRAMES_VAR
|
||||
from frigate.models import Regions
|
||||
from frigate.util import Process as FrigateProcess
|
||||
from frigate.util.builtin import empty_and_close_queue
|
||||
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
|
||||
from frigate.util.object import get_camera_regions_grid
|
||||
from frigate.video import capture_camera, track_camera
|
||||
from frigate.video import CameraCapture, CameraTracker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -31,7 +32,7 @@ class CameraMaintainer(threading.Thread):
|
||||
config: FrigateConfig,
|
||||
detection_queue: Queue,
|
||||
detected_frames_queue: Queue,
|
||||
camera_metrics: dict[str, CameraMetrics],
|
||||
camera_metrics: DictProxy,
|
||||
ptz_metrics: dict[str, PTZMetrics],
|
||||
stop_event: MpEvent,
|
||||
):
|
||||
@ -53,6 +54,8 @@ class CameraMaintainer(threading.Thread):
|
||||
],
|
||||
)
|
||||
self.shm_count = self.__calculate_shm_frame_count()
|
||||
self.camera_processes: dict[str, mp.Process] = {}
|
||||
self.capture_processes: dict[str, mp.Process] = {}
|
||||
|
||||
def __init_historical_regions(self) -> None:
|
||||
# delete region grids for removed or renamed cameras
|
||||
@ -151,24 +154,19 @@ class CameraMaintainer(threading.Thread):
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
camera_process = FrigateProcess(
|
||||
target=track_camera,
|
||||
name=f"camera_processor:{name}",
|
||||
args=(
|
||||
config.name,
|
||||
config,
|
||||
self.config.model,
|
||||
self.config.model.merged_labelmap,
|
||||
self.detection_queue,
|
||||
self.detected_frames_queue,
|
||||
self.camera_metrics[name],
|
||||
self.ptz_metrics[name],
|
||||
self.region_grids[name],
|
||||
),
|
||||
daemon=True,
|
||||
camera_process = CameraTracker(
|
||||
config,
|
||||
self.config.model,
|
||||
self.config.model.merged_labelmap,
|
||||
self.detection_queue,
|
||||
self.detected_frames_queue,
|
||||
self.camera_metrics[name],
|
||||
self.ptz_metrics[name],
|
||||
self.region_grids[name],
|
||||
)
|
||||
self.camera_metrics[config.name].process = camera_process
|
||||
self.camera_processes[config.name] = camera_process
|
||||
camera_process.start()
|
||||
self.camera_metrics[config.name].process_pid.value = camera_process.pid
|
||||
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
|
||||
|
||||
def __start_camera_capture(
|
||||
@ -184,32 +182,28 @@ class CameraMaintainer(threading.Thread):
|
||||
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
|
||||
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
|
||||
|
||||
capture_process = FrigateProcess(
|
||||
target=capture_camera,
|
||||
name=f"camera_capture:{name}",
|
||||
args=(config, count, self.camera_metrics[name]),
|
||||
)
|
||||
capture_process = CameraCapture(config, count, self.camera_metrics[name])
|
||||
capture_process.daemon = True
|
||||
self.camera_metrics[name].capture_process = capture_process
|
||||
self.capture_processes[name] = capture_process
|
||||
capture_process.start()
|
||||
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
|
||||
logger.info(f"Capture process started for {name}: {capture_process.pid}")
|
||||
|
||||
def __stop_camera_capture_process(self, camera: str) -> None:
|
||||
capture_process = self.camera_metrics[camera].capture_process
|
||||
capture_process = self.capture_processes[camera]
|
||||
if capture_process is not None:
|
||||
logger.info(f"Waiting for capture process for {camera} to stop")
|
||||
capture_process.terminate()
|
||||
capture_process.join()
|
||||
|
||||
def __stop_camera_process(self, camera: str) -> None:
|
||||
metrics = self.camera_metrics[camera]
|
||||
camera_process = metrics.process
|
||||
camera_process = self.camera_processes[camera]
|
||||
if camera_process is not None:
|
||||
logger.info(f"Waiting for process for {camera} to stop")
|
||||
camera_process.terminate()
|
||||
camera_process.join()
|
||||
logger.info(f"Closing frame queue for {camera}")
|
||||
empty_and_close_queue(metrics.frame_queue)
|
||||
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
|
||||
|
||||
def run(self):
|
||||
self.__init_historical_regions()
|
||||
@ -240,11 +234,11 @@ class CameraMaintainer(threading.Thread):
|
||||
self.__stop_camera_process(camera)
|
||||
|
||||
# ensure the capture processes are done
|
||||
for camera in self.camera_metrics.keys():
|
||||
for camera in self.camera_processes.keys():
|
||||
self.__stop_camera_capture_process(camera)
|
||||
|
||||
# ensure the camera processors are done
|
||||
for camera in self.camera_metrics.keys():
|
||||
for camera in self.capture_processes.keys():
|
||||
self.__stop_camera_process(camera)
|
||||
|
||||
self.update_subscriber.stop()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
"""Embeddings types."""
|
||||
|
||||
import multiprocessing as mp
|
||||
from enum import Enum
|
||||
from multiprocessing.managers import SyncManager
|
||||
from multiprocessing.sharedctypes import Synchronized
|
||||
|
||||
import sherpa_onnx
|
||||
@ -20,25 +20,27 @@ class DataProcessorMetrics:
|
||||
alpr_pps: Synchronized
|
||||
yolov9_lpr_speed: Synchronized
|
||||
yolov9_lpr_pps: Synchronized
|
||||
classification_speeds: dict[str, Synchronized] = {}
|
||||
classification_cps: dict[str, Synchronized] = {}
|
||||
classification_speeds: dict[str, Synchronized]
|
||||
classification_cps: dict[str, Synchronized]
|
||||
|
||||
def __init__(self, custom_classification_models: list[str]):
|
||||
self.image_embeddings_speed = mp.Value("d", 0.0)
|
||||
self.image_embeddings_eps = mp.Value("d", 0.0)
|
||||
self.text_embeddings_speed = mp.Value("d", 0.0)
|
||||
self.text_embeddings_eps = mp.Value("d", 0.0)
|
||||
self.face_rec_speed = mp.Value("d", 0.0)
|
||||
self.face_rec_fps = mp.Value("d", 0.0)
|
||||
self.alpr_speed = mp.Value("d", 0.0)
|
||||
self.alpr_pps = mp.Value("d", 0.0)
|
||||
self.yolov9_lpr_speed = mp.Value("d", 0.0)
|
||||
self.yolov9_lpr_pps = mp.Value("d", 0.0)
|
||||
def __init__(self, manager: SyncManager, custom_classification_models: list[str]):
|
||||
self.image_embeddings_speed = manager.Value("d", 0.0)
|
||||
self.image_embeddings_eps = manager.Value("d", 0.0)
|
||||
self.text_embeddings_speed = manager.Value("d", 0.0)
|
||||
self.text_embeddings_eps = manager.Value("d", 0.0)
|
||||
self.face_rec_speed = manager.Value("d", 0.0)
|
||||
self.face_rec_fps = manager.Value("d", 0.0)
|
||||
self.alpr_speed = manager.Value("d", 0.0)
|
||||
self.alpr_pps = manager.Value("d", 0.0)
|
||||
self.yolov9_lpr_speed = manager.Value("d", 0.0)
|
||||
self.yolov9_lpr_pps = manager.Value("d", 0.0)
|
||||
self.classification_speeds = manager.dict()
|
||||
self.classification_cps = manager.dict()
|
||||
|
||||
if custom_classification_models:
|
||||
for key in custom_classification_models:
|
||||
self.classification_speeds[key] = mp.Value("d", 0.0)
|
||||
self.classification_cps[key] = mp.Value("d", 0.0)
|
||||
self.classification_speeds[key] = manager.Value("d", 0.0)
|
||||
self.classification_cps[key] = manager.Value("d", 0.0)
|
||||
|
||||
|
||||
class DataProcessorModelRunner:
|
||||
|
||||
@ -3,26 +3,22 @@
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
from types import FrameType
|
||||
from typing import Any, Optional, Union
|
||||
from typing import Any, Union
|
||||
|
||||
import regex
|
||||
from pathvalidate import ValidationError, sanitize_filename
|
||||
from setproctitle import setproctitle
|
||||
|
||||
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import CONFIG_DIR, FACE_DIR
|
||||
from frigate.data_processing.types import DataProcessorMetrics
|
||||
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.models import Event
|
||||
from frigate.util import Process as FrigateProcess
|
||||
from frigate.util.builtin import serialize
|
||||
from frigate.util.classification import kickoff_model_training
|
||||
from frigate.util.services import listen
|
||||
|
||||
from .maintainer import EmbeddingMaintainer
|
||||
from .util import ZScoreNormalization
|
||||
@ -30,40 +26,22 @@ from .util import ZScoreNormalization
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None:
|
||||
stop_event = mp.Event()
|
||||
class EmbeddingProcess(FrigateProcess):
|
||||
def __init__(
|
||||
self, config: FrigateConfig, metrics: DataProcessorMetrics | None
|
||||
) -> None:
|
||||
super().__init__(name="frigate.embeddings_manager", daemon=True)
|
||||
self.config = config
|
||||
self.metrics = metrics
|
||||
|
||||
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
threading.current_thread().name = "process:embeddings_manager"
|
||||
setproctitle("frigate.embeddings_manager")
|
||||
listen()
|
||||
|
||||
# Configure Frigate DB
|
||||
db = SqliteVecQueueDatabase(
|
||||
config.database.path,
|
||||
pragmas={
|
||||
"auto_vacuum": "FULL", # Does not defragment database
|
||||
"cache_size": -512 * 1000, # 512MB of cache
|
||||
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
},
|
||||
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
|
||||
load_vec_extension=True,
|
||||
)
|
||||
models = [Event, Recordings]
|
||||
db.bind(models)
|
||||
|
||||
maintainer = EmbeddingMaintainer(
|
||||
db,
|
||||
config,
|
||||
metrics,
|
||||
stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
maintainer = EmbeddingMaintainer(
|
||||
self.config,
|
||||
self.metrics,
|
||||
self.stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
|
||||
|
||||
class EmbeddingsContext:
|
||||
|
||||
@ -12,7 +12,6 @@ from typing import Any, Optional
|
||||
import cv2
|
||||
import numpy as np
|
||||
from peewee import DoesNotExist
|
||||
from playhouse.sqliteq import SqliteQueueDatabase
|
||||
|
||||
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
||||
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
|
||||
@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import (
|
||||
LicensePlateRealTimeProcessor,
|
||||
)
|
||||
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
|
||||
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
|
||||
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
|
||||
from frigate.genai import get_genai_client
|
||||
from frigate.models import Event
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.types import TrackedObjectUpdateTypesEnum
|
||||
from frigate.util.builtin import serialize
|
||||
from frigate.util.image import (
|
||||
@ -82,9 +82,8 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db: SqliteQueueDatabase,
|
||||
config: FrigateConfig,
|
||||
metrics: DataProcessorMetrics,
|
||||
metrics: DataProcessorMetrics | None,
|
||||
stop_event: MpEvent,
|
||||
) -> None:
|
||||
super().__init__(name="embeddings_maintainer")
|
||||
@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
|
||||
)
|
||||
|
||||
# Configure Frigate DB
|
||||
db = SqliteVecQueueDatabase(
|
||||
config.database.path,
|
||||
pragmas={
|
||||
"auto_vacuum": "FULL", # Does not defragment database
|
||||
"cache_size": -512 * 1000, # 512MB of cache
|
||||
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
},
|
||||
timeout=max(
|
||||
60, 10 * len([c for c in config.cameras.values() if c.enabled])
|
||||
),
|
||||
load_vec_extension=True,
|
||||
)
|
||||
models = [Event, Recordings]
|
||||
db.bind(models)
|
||||
|
||||
if config.semantic_search.enabled:
|
||||
self.embeddings = Embeddings(config, db, metrics)
|
||||
|
||||
|
||||
@ -6,12 +6,12 @@ import random
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from multiprocessing.managers import DictProxy
|
||||
from typing import Any, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.camera import CameraMetrics
|
||||
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
|
||||
from frigate.comms.event_metadata_updater import (
|
||||
EventMetadataPublisher,
|
||||
@ -83,7 +83,7 @@ class AudioProcessor(util.Process):
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
cameras: list[CameraConfig],
|
||||
camera_metrics: dict[str, CameraMetrics],
|
||||
camera_metrics: DictProxy,
|
||||
):
|
||||
super().__init__(name="frigate.audio_manager", daemon=True)
|
||||
|
||||
@ -105,6 +105,7 @@ class AudioProcessor(util.Process):
|
||||
self.transcription_model_runner = None
|
||||
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
audio_threads: list[AudioEventMaintainer] = []
|
||||
|
||||
threading.current_thread().name = "process:audio_manager"
|
||||
@ -146,7 +147,7 @@ class AudioEventMaintainer(threading.Thread):
|
||||
self,
|
||||
camera: CameraConfig,
|
||||
config: FrigateConfig,
|
||||
camera_metrics: dict[str, CameraMetrics],
|
||||
camera_metrics: DictProxy,
|
||||
audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
|
||||
stop_event: threading.Event,
|
||||
) -> None:
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
# In log.py
|
||||
import atexit
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from collections import deque
|
||||
from logging.handlers import QueueHandler, QueueListener
|
||||
from multiprocessing.managers import SyncManager
|
||||
from queue import Queue
|
||||
from typing import Deque, Optional
|
||||
|
||||
from frigate.util.builtin import clean_camera_user_pass
|
||||
@ -32,12 +34,12 @@ LOG_HANDLER.addFilter(
|
||||
)
|
||||
|
||||
log_listener: Optional[QueueListener] = None
|
||||
log_queue: Optional[Queue] = None
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
global log_listener
|
||||
|
||||
log_queue: mp.Queue = mp.Queue()
|
||||
def setup_logging(manager: SyncManager) -> None:
|
||||
global log_listener, log_queue
|
||||
log_queue = manager.Queue()
|
||||
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
|
||||
|
||||
atexit.register(_stop_logging)
|
||||
@ -54,7 +56,6 @@ def setup_logging() -> None:
|
||||
|
||||
def _stop_logging() -> None:
|
||||
global log_listener
|
||||
|
||||
if log_listener is not None:
|
||||
log_listener.stop()
|
||||
log_listener = None
|
||||
|
||||
@ -1,16 +1,11 @@
|
||||
import datetime
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import queue
|
||||
import signal
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
from multiprocessing import Queue, Value
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
|
||||
import numpy as np
|
||||
from setproctitle import setproctitle
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.comms.object_detector_signaler import (
|
||||
@ -25,7 +20,6 @@ from frigate.detectors.detector_config import (
|
||||
)
|
||||
from frigate.util.builtin import EventsPerSecond, load_labels
|
||||
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
|
||||
from frigate.util.services import listen
|
||||
|
||||
from .util import tensor_transform
|
||||
|
||||
@ -90,72 +84,75 @@ class LocalObjectDetector(ObjectDetector):
|
||||
return self.detect_api.detect_raw(tensor_input=tensor_input)
|
||||
|
||||
|
||||
def run_detector(
|
||||
name: str,
|
||||
detection_queue: Queue,
|
||||
cameras: list[str],
|
||||
avg_speed: Value,
|
||||
start: Value,
|
||||
detector_config: BaseDetectorConfig,
|
||||
):
|
||||
threading.current_thread().name = f"detector:{name}"
|
||||
logger = logging.getLogger(f"detector.{name}")
|
||||
logger.info(f"Starting detection process: {os.getpid()}")
|
||||
setproctitle(f"frigate.detector.{name}")
|
||||
listen()
|
||||
class DetectorRunner(util.Process):
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
detection_queue: Queue,
|
||||
cameras: list[str],
|
||||
avg_speed: Value,
|
||||
start_time: Value,
|
||||
detector_config: BaseDetectorConfig,
|
||||
) -> None:
|
||||
super().__init__(name=name, daemon=True)
|
||||
self.detection_queue = detection_queue
|
||||
self.cameras = cameras
|
||||
self.avg_speed = avg_speed
|
||||
self.start_time = start_time
|
||||
self.detector_config = detector_config
|
||||
self.outputs: dict = {}
|
||||
|
||||
stop_event: MpEvent = mp.Event()
|
||||
|
||||
def receiveSignal(signalNumber, frame):
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
def create_output_shm(name: str):
|
||||
def create_output_shm(self, name: str):
|
||||
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
|
||||
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
|
||||
outputs[name] = {"shm": out_shm, "np": out_np}
|
||||
self.outputs[name] = {"shm": out_shm, "np": out_np}
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
object_detector = LocalObjectDetector(detector_config=detector_config)
|
||||
detector_publisher = ObjectDetectorPublisher()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
|
||||
outputs = {}
|
||||
for name in cameras:
|
||||
create_output_shm(name)
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
object_detector = LocalObjectDetector(detector_config=self.detector_config)
|
||||
detector_publisher = ObjectDetectorPublisher()
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
connection_id = detection_queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
input_frame = frame_manager.get(
|
||||
connection_id,
|
||||
(1, detector_config.model.height, detector_config.model.width, 3),
|
||||
)
|
||||
for name in self.cameras:
|
||||
self.create_output_shm(name)
|
||||
|
||||
if input_frame is None:
|
||||
logger.warning(f"Failed to get frame {connection_id} from SHM")
|
||||
continue
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
connection_id = self.detection_queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
input_frame = frame_manager.get(
|
||||
connection_id,
|
||||
(
|
||||
1,
|
||||
self.detector_config.model.height,
|
||||
self.detector_config.model.width,
|
||||
3,
|
||||
),
|
||||
)
|
||||
|
||||
# detect and send the output
|
||||
start.value = datetime.datetime.now().timestamp()
|
||||
detections = object_detector.detect_raw(input_frame)
|
||||
duration = datetime.datetime.now().timestamp() - start.value
|
||||
frame_manager.close(connection_id)
|
||||
if input_frame is None:
|
||||
logger.warning(f"Failed to get frame {connection_id} from SHM")
|
||||
continue
|
||||
|
||||
if connection_id not in outputs:
|
||||
create_output_shm(connection_id)
|
||||
# detect and send the output
|
||||
self.start_time.value = datetime.datetime.now().timestamp()
|
||||
detections = object_detector.detect_raw(input_frame)
|
||||
duration = datetime.datetime.now().timestamp() - self.start_time.value
|
||||
frame_manager.close(connection_id)
|
||||
|
||||
outputs[connection_id]["np"][:] = detections[:]
|
||||
detector_publisher.publish(connection_id)
|
||||
start.value = 0.0
|
||||
if connection_id not in self.outputs:
|
||||
self.create_output_shm(connection_id)
|
||||
|
||||
avg_speed.value = (avg_speed.value * 9 + duration) / 10
|
||||
self.outputs[connection_id]["np"][:] = detections[:]
|
||||
detector_publisher.publish(connection_id)
|
||||
self.start_time.value = 0.0
|
||||
|
||||
detector_publisher.stop()
|
||||
logger.info("Exited detection process...")
|
||||
self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10
|
||||
|
||||
detector_publisher.stop()
|
||||
logger.info("Exited detection process...")
|
||||
|
||||
|
||||
class ObjectDetectProcess:
|
||||
@ -192,19 +189,14 @@ class ObjectDetectProcess:
|
||||
self.detection_start.value = 0.0
|
||||
if (self.detect_process is not None) and self.detect_process.is_alive():
|
||||
self.stop()
|
||||
self.detect_process = util.Process(
|
||||
target=run_detector,
|
||||
name=f"detector:{self.name}",
|
||||
args=(
|
||||
self.name,
|
||||
self.detection_queue,
|
||||
self.cameras,
|
||||
self.avg_inference_speed,
|
||||
self.detection_start,
|
||||
self.detector_config,
|
||||
),
|
||||
self.detect_process = DetectorRunner(
|
||||
f"detector:{self.name}",
|
||||
self.detection_queue,
|
||||
self.cameras,
|
||||
self.avg_inference_speed,
|
||||
self.detection_start,
|
||||
self.detector_config,
|
||||
)
|
||||
self.detect_process.daemon = True
|
||||
self.detect_process.start()
|
||||
|
||||
|
||||
|
||||
@ -2,14 +2,11 @@
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import threading
|
||||
from wsgiref.simple_server import make_server
|
||||
|
||||
from setproctitle import setproctitle
|
||||
from ws4py.server.wsgirefserver import (
|
||||
WebSocketWSGIHandler,
|
||||
WebSocketWSGIRequestHandler,
|
||||
@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import (
|
||||
)
|
||||
from ws4py.server.wsgiutils import WebSocketWSGIApplication
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
||||
from frigate.comms.ws import WebSocket
|
||||
from frigate.config import FrigateConfig
|
||||
@ -73,189 +71,193 @@ def check_disabled_camera_update(
|
||||
birdseye.all_cameras_disabled()
|
||||
|
||||
|
||||
def output_frames(
|
||||
config: FrigateConfig,
|
||||
):
|
||||
threading.current_thread().name = "output"
|
||||
setproctitle("frigate.output")
|
||||
class OutputProcess(util.Process):
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
super().__init__(name="frigate.output", daemon=True)
|
||||
self.config = config
|
||||
|
||||
stop_event = mp.Event()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
|
||||
def receiveSignal(signalNumber, frame):
|
||||
stop_event.set()
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
|
||||
# start a websocket server on 8082
|
||||
WebSocketWSGIHandler.http_version = "1.1"
|
||||
websocket_server = make_server(
|
||||
"127.0.0.1",
|
||||
8082,
|
||||
server_class=WSGIServer,
|
||||
handler_class=WebSocketWSGIRequestHandler,
|
||||
app=WebSocketWSGIApplication(handler_cls=WebSocket),
|
||||
)
|
||||
websocket_server.initialize_websockets_manager()
|
||||
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
|
||||
|
||||
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
|
||||
config_subscriber = CameraConfigUpdateSubscriber(
|
||||
config,
|
||||
config.cameras,
|
||||
[
|
||||
CameraConfigUpdateEnum.add,
|
||||
CameraConfigUpdateEnum.birdseye,
|
||||
CameraConfigUpdateEnum.enabled,
|
||||
CameraConfigUpdateEnum.record,
|
||||
],
|
||||
)
|
||||
|
||||
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
|
||||
birdseye: Birdseye | None = None
|
||||
preview_recorders: dict[str, PreviewRecorder] = {}
|
||||
preview_write_times: dict[str, float] = {}
|
||||
failed_frame_requests: dict[str, int] = {}
|
||||
last_disabled_cam_check = datetime.datetime.now().timestamp()
|
||||
|
||||
move_preview_frames("cache")
|
||||
|
||||
for camera, cam_config in config.cameras.items():
|
||||
if not cam_config.enabled_in_config:
|
||||
continue
|
||||
|
||||
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
|
||||
preview_recorders[camera] = PreviewRecorder(cam_config)
|
||||
preview_write_times[camera] = 0
|
||||
|
||||
if config.birdseye.enabled:
|
||||
birdseye = Birdseye(config, stop_event, websocket_server)
|
||||
|
||||
websocket_thread.start()
|
||||
|
||||
while not stop_event.is_set():
|
||||
# check if there is an updated config
|
||||
updates = config_subscriber.check_for_updates()
|
||||
|
||||
if "add" in updates:
|
||||
for camera in updates["add"]:
|
||||
jsmpeg_cameras[camera] = JsmpegCamera(
|
||||
cam_config, stop_event, websocket_server
|
||||
)
|
||||
preview_recorders[camera] = PreviewRecorder(cam_config)
|
||||
preview_write_times[camera] = 0
|
||||
|
||||
(topic, data) = detection_subscriber.check_for_update(timeout=1)
|
||||
now = datetime.datetime.now().timestamp()
|
||||
|
||||
if now - last_disabled_cam_check > 5:
|
||||
# check disabled cameras every 5 seconds
|
||||
last_disabled_cam_check = now
|
||||
check_disabled_camera_update(
|
||||
config, birdseye, preview_recorders, preview_write_times
|
||||
)
|
||||
|
||||
if not topic:
|
||||
continue
|
||||
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
_,
|
||||
) = data
|
||||
|
||||
if not config.cameras[camera].enabled:
|
||||
continue
|
||||
|
||||
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
|
||||
|
||||
if frame is None:
|
||||
logger.debug(f"Failed to get frame {frame_name} from SHM")
|
||||
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
|
||||
|
||||
if failed_frame_requests[camera] > config.cameras[camera].detect.fps:
|
||||
logger.warning(
|
||||
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
|
||||
)
|
||||
|
||||
continue
|
||||
else:
|
||||
failed_frame_requests[camera] = 0
|
||||
|
||||
# send frames for low fps recording
|
||||
preview_recorders[camera].write_data(
|
||||
current_tracked_objects, motion_boxes, frame_time, frame
|
||||
# start a websocket server on 8082
|
||||
WebSocketWSGIHandler.http_version = "1.1"
|
||||
websocket_server = make_server(
|
||||
"127.0.0.1",
|
||||
8082,
|
||||
server_class=WSGIServer,
|
||||
handler_class=WebSocketWSGIRequestHandler,
|
||||
app=WebSocketWSGIApplication(handler_cls=WebSocket),
|
||||
)
|
||||
preview_write_times[camera] = frame_time
|
||||
websocket_server.initialize_websockets_manager()
|
||||
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
|
||||
|
||||
# send camera frame to ffmpeg process if websockets are connected
|
||||
if any(
|
||||
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
|
||||
):
|
||||
# write to the converter for the camera if clients are listening to the specific camera
|
||||
jsmpeg_cameras[camera].write_frame(frame.tobytes())
|
||||
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
|
||||
config_subscriber = CameraConfigUpdateSubscriber(
|
||||
self.config,
|
||||
self.config.cameras,
|
||||
[
|
||||
CameraConfigUpdateEnum.add,
|
||||
CameraConfigUpdateEnum.birdseye,
|
||||
CameraConfigUpdateEnum.enabled,
|
||||
CameraConfigUpdateEnum.record,
|
||||
],
|
||||
)
|
||||
|
||||
# send output data to birdseye if websocket is connected or restreaming
|
||||
if config.birdseye.enabled and (
|
||||
config.birdseye.restream
|
||||
or any(
|
||||
ws.environ["PATH_INFO"].endswith("birdseye")
|
||||
for ws in websocket_server.manager
|
||||
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
|
||||
birdseye: Birdseye | None = None
|
||||
preview_recorders: dict[str, PreviewRecorder] = {}
|
||||
preview_write_times: dict[str, float] = {}
|
||||
failed_frame_requests: dict[str, int] = {}
|
||||
last_disabled_cam_check = datetime.datetime.now().timestamp()
|
||||
|
||||
move_preview_frames("cache")
|
||||
|
||||
for camera, cam_config in self.config.cameras.items():
|
||||
if not cam_config.enabled_in_config:
|
||||
continue
|
||||
|
||||
jsmpeg_cameras[camera] = JsmpegCamera(
|
||||
cam_config, self.stop_event, websocket_server
|
||||
)
|
||||
):
|
||||
birdseye.write_data(
|
||||
preview_recorders[camera] = PreviewRecorder(cam_config)
|
||||
preview_write_times[camera] = 0
|
||||
|
||||
if self.config.birdseye.enabled:
|
||||
birdseye = Birdseye(self.config, self.stop_event, websocket_server)
|
||||
|
||||
websocket_thread.start()
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
# check if there is an updated config
|
||||
updates = config_subscriber.check_for_updates()
|
||||
|
||||
if "add" in updates:
|
||||
for camera in updates["add"]:
|
||||
jsmpeg_cameras[camera] = JsmpegCamera(
|
||||
cam_config, self.stop_event, websocket_server
|
||||
)
|
||||
preview_recorders[camera] = PreviewRecorder(cam_config)
|
||||
preview_write_times[camera] = 0
|
||||
|
||||
(topic, data) = detection_subscriber.check_for_update(timeout=1)
|
||||
now = datetime.datetime.now().timestamp()
|
||||
|
||||
if now - last_disabled_cam_check > 5:
|
||||
# check disabled cameras every 5 seconds
|
||||
last_disabled_cam_check = now
|
||||
check_disabled_camera_update(
|
||||
self.config, birdseye, preview_recorders, preview_write_times
|
||||
)
|
||||
|
||||
if not topic:
|
||||
continue
|
||||
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
frame_time,
|
||||
frame,
|
||||
_,
|
||||
) = data
|
||||
|
||||
if not self.config.cameras[camera].enabled:
|
||||
continue
|
||||
|
||||
frame = frame_manager.get(
|
||||
frame_name, self.config.cameras[camera].frame_shape_yuv
|
||||
)
|
||||
|
||||
frame_manager.close(frame_name)
|
||||
if frame is None:
|
||||
logger.debug(f"Failed to get frame {frame_name} from SHM")
|
||||
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
|
||||
|
||||
move_preview_frames("clips")
|
||||
if (
|
||||
failed_frame_requests[camera]
|
||||
> self.config.cameras[camera].detect.fps
|
||||
):
|
||||
logger.warning(
|
||||
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
|
||||
)
|
||||
|
||||
while True:
|
||||
(topic, data) = detection_subscriber.check_for_update(timeout=0)
|
||||
continue
|
||||
else:
|
||||
failed_frame_requests[camera] = 0
|
||||
|
||||
if not topic:
|
||||
break
|
||||
# send frames for low fps recording
|
||||
preview_recorders[camera].write_data(
|
||||
current_tracked_objects, motion_boxes, frame_time, frame
|
||||
)
|
||||
preview_write_times[camera] = frame_time
|
||||
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
regions,
|
||||
) = data
|
||||
# send camera frame to ffmpeg process if websockets are connected
|
||||
if any(
|
||||
ws.environ["PATH_INFO"].endswith(camera)
|
||||
for ws in websocket_server.manager
|
||||
):
|
||||
# write to the converter for the camera if clients are listening to the specific camera
|
||||
jsmpeg_cameras[camera].write_frame(frame.tobytes())
|
||||
|
||||
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
|
||||
frame_manager.close(frame_name)
|
||||
# send output data to birdseye if websocket is connected or restreaming
|
||||
if self.config.birdseye.enabled and (
|
||||
self.config.birdseye.restream
|
||||
or any(
|
||||
ws.environ["PATH_INFO"].endswith("birdseye")
|
||||
for ws in websocket_server.manager
|
||||
)
|
||||
):
|
||||
birdseye.write_data(
|
||||
camera,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
frame_time,
|
||||
frame,
|
||||
)
|
||||
|
||||
detection_subscriber.stop()
|
||||
frame_manager.close(frame_name)
|
||||
|
||||
for jsmpeg in jsmpeg_cameras.values():
|
||||
jsmpeg.stop()
|
||||
move_preview_frames("clips")
|
||||
|
||||
for preview in preview_recorders.values():
|
||||
preview.stop()
|
||||
while True:
|
||||
(topic, data) = detection_subscriber.check_for_update(timeout=0)
|
||||
|
||||
if birdseye is not None:
|
||||
birdseye.stop()
|
||||
if not topic:
|
||||
break
|
||||
|
||||
config_subscriber.stop()
|
||||
websocket_server.manager.close_all()
|
||||
websocket_server.manager.stop()
|
||||
websocket_server.manager.join()
|
||||
websocket_server.shutdown()
|
||||
websocket_thread.join()
|
||||
logger.info("exiting output process...")
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
regions,
|
||||
) = data
|
||||
|
||||
frame = frame_manager.get(
|
||||
frame_name, self.config.cameras[camera].frame_shape_yuv
|
||||
)
|
||||
frame_manager.close(frame_name)
|
||||
|
||||
detection_subscriber.stop()
|
||||
|
||||
for jsmpeg in jsmpeg_cameras.values():
|
||||
jsmpeg.stop()
|
||||
|
||||
for preview in preview_recorders.values():
|
||||
preview.stop()
|
||||
|
||||
if birdseye is not None:
|
||||
birdseye.stop()
|
||||
|
||||
config_subscriber.stop()
|
||||
websocket_server.manager.close_all()
|
||||
websocket_server.manager.stop()
|
||||
websocket_server.manager.join()
|
||||
websocket_server.shutdown()
|
||||
websocket_thread.join()
|
||||
logger.info("exiting output process...")
|
||||
|
||||
|
||||
def move_preview_frames(loc: str):
|
||||
|
||||
@ -1,50 +1,40 @@
|
||||
"""Run recording maintainer and cleanup."""
|
||||
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import signal
|
||||
import threading
|
||||
from types import FrameType
|
||||
from typing import Optional
|
||||
|
||||
from playhouse.sqliteq import SqliteQueueDatabase
|
||||
from setproctitle import setproctitle
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.models import Recordings, ReviewSegment
|
||||
from frigate.record.maintainer import RecordingMaintainer
|
||||
from frigate.util.services import listen
|
||||
from frigate.util import Process as FrigateProcess
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def manage_recordings(config: FrigateConfig) -> None:
|
||||
stop_event = mp.Event()
|
||||
class RecordProcess(FrigateProcess):
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
super().__init__(name="frigate.recording_manager", daemon=True)
|
||||
self.config = config
|
||||
|
||||
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
|
||||
stop_event.set()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
db = SqliteQueueDatabase(
|
||||
self.config.database.path,
|
||||
pragmas={
|
||||
"auto_vacuum": "FULL", # Does not defragment database
|
||||
"cache_size": -512 * 1000, # 512MB of cache
|
||||
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
},
|
||||
timeout=max(
|
||||
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
|
||||
),
|
||||
)
|
||||
models = [ReviewSegment, Recordings]
|
||||
db.bind(models)
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
threading.current_thread().name = "process:recording_manager"
|
||||
setproctitle("frigate.recording_manager")
|
||||
listen()
|
||||
|
||||
db = SqliteQueueDatabase(
|
||||
config.database.path,
|
||||
pragmas={
|
||||
"auto_vacuum": "FULL", # Does not defragment database
|
||||
"cache_size": -512 * 1000, # 512MB of cache
|
||||
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
},
|
||||
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
|
||||
)
|
||||
models = [ReviewSegment, Recordings]
|
||||
db.bind(models)
|
||||
|
||||
maintainer = RecordingMaintainer(
|
||||
config,
|
||||
stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
maintainer = RecordingMaintainer(
|
||||
self.config,
|
||||
self.stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
|
||||
@ -1,36 +1,23 @@
|
||||
"""Run recording maintainer and cleanup."""
|
||||
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import signal
|
||||
import threading
|
||||
from types import FrameType
|
||||
from typing import Optional
|
||||
|
||||
from setproctitle import setproctitle
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.review.maintainer import ReviewSegmentMaintainer
|
||||
from frigate.util.services import listen
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def manage_review_segments(config: FrigateConfig) -> None:
|
||||
stop_event = mp.Event()
|
||||
class ReviewProcess(util.Process):
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
super().__init__(name="frigate.review_segment_manager", daemon=True)
|
||||
self.config = config
|
||||
|
||||
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
threading.current_thread().name = "process:review_segment_manager"
|
||||
setproctitle("frigate.review_segment_manager")
|
||||
listen()
|
||||
|
||||
maintainer = ReviewSegmentMaintainer(
|
||||
config,
|
||||
stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
maintainer = ReviewSegmentMaintainer(
|
||||
self.config,
|
||||
self.stop_event,
|
||||
)
|
||||
maintainer.start()
|
||||
|
||||
@ -5,13 +5,13 @@ import os
|
||||
import shutil
|
||||
import time
|
||||
from json import JSONDecodeError
|
||||
from multiprocessing.managers import DictProxy
|
||||
from typing import Any, Optional
|
||||
|
||||
import psutil
|
||||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from frigate.camera import CameraMetrics
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
|
||||
from frigate.data_processing.types import DataProcessorMetrics
|
||||
@ -53,7 +53,7 @@ def get_latest_version(config: FrigateConfig) -> str:
|
||||
|
||||
def stats_init(
|
||||
config: FrigateConfig,
|
||||
camera_metrics: dict[str, CameraMetrics],
|
||||
camera_metrics: DictProxy,
|
||||
embeddings_metrics: DataProcessorMetrics | None,
|
||||
detectors: dict[str, ObjectDetectProcess],
|
||||
processes: dict[str, int],
|
||||
@ -271,10 +271,12 @@ def stats_snapshot(
|
||||
stats["cameras"] = {}
|
||||
for name, camera_stats in camera_metrics.items():
|
||||
total_detection_fps += camera_stats.detection_fps.value
|
||||
pid = camera_stats.process.pid if camera_stats.process else None
|
||||
pid = camera_stats.process_pid.value if camera_stats.process_pid.value else None
|
||||
ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None
|
||||
capture_pid = (
|
||||
camera_stats.capture_process.pid if camera_stats.capture_process else None
|
||||
camera_stats.capture_process_pid.value
|
||||
if camera_stats.capture_process_pid.value
|
||||
else None
|
||||
)
|
||||
stats["cameras"][name] = {
|
||||
"camera_fps": round(camera_stats.camera_fps.value, 2),
|
||||
|
||||
@ -341,11 +341,14 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
|
||||
def empty_and_close_queue(q: mp.Queue):
|
||||
while True:
|
||||
try:
|
||||
q.get(block=True, timeout=0.5)
|
||||
except queue.Empty:
|
||||
q.close()
|
||||
q.join_thread()
|
||||
return
|
||||
try:
|
||||
q.get(block=True, timeout=0.5)
|
||||
except (queue.Empty, EOFError):
|
||||
q.close()
|
||||
q.join_thread()
|
||||
return
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def generate_color_palette(n):
|
||||
|
||||
@ -4,9 +4,8 @@ import multiprocessing as mp
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
from functools import wraps
|
||||
from logging.handlers import QueueHandler
|
||||
from typing import Any, Callable, Optional
|
||||
from typing import Callable, Optional
|
||||
|
||||
import frigate.log
|
||||
|
||||
@ -30,34 +29,12 @@ class BaseProcess(mp.Process):
|
||||
super().start(*args, **kwargs)
|
||||
self.after_start()
|
||||
|
||||
def __getattribute__(self, name: str) -> Any:
|
||||
if name == "run":
|
||||
run = super().__getattribute__("run")
|
||||
|
||||
@wraps(run)
|
||||
def run_wrapper(*args, **kwargs):
|
||||
try:
|
||||
self.before_run()
|
||||
return run(*args, **kwargs)
|
||||
finally:
|
||||
self.after_run()
|
||||
|
||||
return run_wrapper
|
||||
|
||||
return super().__getattribute__(name)
|
||||
|
||||
def before_start(self) -> None:
|
||||
pass
|
||||
|
||||
def after_start(self) -> None:
|
||||
pass
|
||||
|
||||
def before_run(self) -> None:
|
||||
pass
|
||||
|
||||
def after_run(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class Process(BaseProcess):
|
||||
logger: logging.Logger
|
||||
@ -73,7 +50,7 @@ class Process(BaseProcess):
|
||||
def before_start(self) -> None:
|
||||
self.__log_queue = frigate.log.log_listener.queue
|
||||
|
||||
def before_run(self) -> None:
|
||||
def pre_run_setup(self) -> None:
|
||||
faulthandler.enable()
|
||||
|
||||
def receiveSignal(signalNumber, frame):
|
||||
@ -88,8 +65,6 @@ class Process(BaseProcess):
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
self.logger = logging.getLogger(self.name)
|
||||
|
||||
logging.basicConfig(handlers=[], force=True)
|
||||
logging.getLogger().addHandler(QueueHandler(self.__log_queue))
|
||||
|
||||
208
frigate/video.py
208
frigate/video.py
@ -1,9 +1,7 @@
|
||||
import datetime
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import queue
|
||||
import signal
|
||||
import subprocess as sp
|
||||
import threading
|
||||
import time
|
||||
@ -12,8 +10,8 @@ from multiprocessing.synchronize import Event as MpEvent
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
from setproctitle import setproctitle
|
||||
|
||||
import frigate.util as util
|
||||
from frigate.camera import CameraMetrics, PTZMetrics
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import CameraConfig, DetectConfig, ModelConfig
|
||||
@ -53,7 +51,6 @@ from frigate.util.object import (
|
||||
is_object_filtered,
|
||||
reduce_detections,
|
||||
)
|
||||
from frigate.util.services import listen
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -318,7 +315,7 @@ class CameraWatchdog(threading.Thread):
|
||||
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
|
||||
)
|
||||
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
|
||||
self.capture_thread = CameraCapture(
|
||||
self.capture_thread = CameraCaptureRunner(
|
||||
self.config,
|
||||
self.shm_frame_count,
|
||||
self.frame_index,
|
||||
@ -396,7 +393,7 @@ class CameraWatchdog(threading.Thread):
|
||||
return newest_segment_time
|
||||
|
||||
|
||||
class CameraCapture(threading.Thread):
|
||||
class CameraCaptureRunner(threading.Thread):
|
||||
def __init__(
|
||||
self,
|
||||
config: CameraConfig,
|
||||
@ -440,103 +437,103 @@ class CameraCapture(threading.Thread):
|
||||
)
|
||||
|
||||
|
||||
def capture_camera(
|
||||
config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
|
||||
):
|
||||
stop_event = mp.Event()
|
||||
class CameraCapture(util.Process):
|
||||
def __init__(
|
||||
self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
|
||||
) -> None:
|
||||
super().__init__(name=f"camera_capture:{config.name}", daemon=True)
|
||||
self.config = config
|
||||
self.shm_frame_count = shm_frame_count
|
||||
self.camera_metrics = camera_metrics
|
||||
|
||||
def receiveSignal(signalNumber, frame):
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
threading.current_thread().name = f"capture:{config.name}"
|
||||
setproctitle(f"frigate.capture:{config.name}")
|
||||
|
||||
camera_watchdog = CameraWatchdog(
|
||||
config,
|
||||
shm_frame_count,
|
||||
camera_metrics.frame_queue,
|
||||
camera_metrics.camera_fps,
|
||||
camera_metrics.skipped_fps,
|
||||
camera_metrics.ffmpeg_pid,
|
||||
stop_event,
|
||||
)
|
||||
camera_watchdog.start()
|
||||
camera_watchdog.join()
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
camera_watchdog = CameraWatchdog(
|
||||
self.config,
|
||||
self.shm_frame_count,
|
||||
self.camera_metrics.frame_queue,
|
||||
self.camera_metrics.camera_fps,
|
||||
self.camera_metrics.skipped_fps,
|
||||
self.camera_metrics.ffmpeg_pid,
|
||||
self.stop_event,
|
||||
)
|
||||
camera_watchdog.start()
|
||||
camera_watchdog.join()
|
||||
|
||||
|
||||
def track_camera(
|
||||
name,
|
||||
config: CameraConfig,
|
||||
model_config: ModelConfig,
|
||||
labelmap: dict[int, str],
|
||||
detection_queue: Queue,
|
||||
detected_objects_queue,
|
||||
camera_metrics: CameraMetrics,
|
||||
ptz_metrics: PTZMetrics,
|
||||
region_grid: list[list[dict[str, Any]]],
|
||||
):
|
||||
stop_event = mp.Event()
|
||||
|
||||
def receiveSignal(signalNumber, frame):
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, receiveSignal)
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
threading.current_thread().name = f"process:{name}"
|
||||
setproctitle(f"frigate.process:{name}")
|
||||
listen()
|
||||
|
||||
frame_queue = camera_metrics.frame_queue
|
||||
|
||||
frame_shape = config.frame_shape
|
||||
|
||||
motion_detector = ImprovedMotionDetector(
|
||||
frame_shape,
|
||||
config.motion,
|
||||
config.detect.fps,
|
||||
name=config.name,
|
||||
ptz_metrics=ptz_metrics,
|
||||
)
|
||||
object_detector = RemoteObjectDetector(
|
||||
name, labelmap, detection_queue, model_config, stop_event
|
||||
)
|
||||
|
||||
object_tracker = NorfairTracker(config, ptz_metrics)
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
|
||||
# create communication for region grid updates
|
||||
requestor = InterProcessRequestor()
|
||||
|
||||
process_frames(
|
||||
name,
|
||||
requestor,
|
||||
frame_queue,
|
||||
frame_shape,
|
||||
model_config,
|
||||
config,
|
||||
frame_manager,
|
||||
motion_detector,
|
||||
object_detector,
|
||||
object_tracker,
|
||||
class CameraTracker(util.Process):
|
||||
def __init__(
|
||||
self,
|
||||
config: CameraConfig,
|
||||
model_config: ModelConfig,
|
||||
labelmap: dict[int, str],
|
||||
detection_queue: Queue,
|
||||
detected_objects_queue,
|
||||
camera_metrics,
|
||||
stop_event,
|
||||
ptz_metrics,
|
||||
region_grid,
|
||||
)
|
||||
camera_metrics: CameraMetrics,
|
||||
ptz_metrics: PTZMetrics,
|
||||
region_grid: list[list[dict[str, Any]]],
|
||||
) -> None:
|
||||
super().__init__(name=f"camera_processor:{config.name}", daemon=True)
|
||||
self.config = config
|
||||
self.model_config = model_config
|
||||
self.labelmap = labelmap
|
||||
self.detection_queue = detection_queue
|
||||
self.detected_objects_queue = detected_objects_queue
|
||||
self.camera_metrics = camera_metrics
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.region_grid = region_grid
|
||||
|
||||
# empty the frame queue
|
||||
logger.info(f"{name}: emptying frame queue")
|
||||
while not frame_queue.empty():
|
||||
(frame_name, _) = frame_queue.get(False)
|
||||
frame_manager.delete(frame_name)
|
||||
def run(self) -> None:
|
||||
self.pre_run_setup()
|
||||
frame_queue = self.camera_metrics.frame_queue
|
||||
frame_shape = self.config.frame_shape
|
||||
|
||||
logger.info(f"{name}: exiting subprocess")
|
||||
motion_detector = ImprovedMotionDetector(
|
||||
frame_shape,
|
||||
self.config.motion,
|
||||
self.config.detect.fps,
|
||||
name=self.config.name,
|
||||
ptz_metrics=self.ptz_metrics,
|
||||
)
|
||||
object_detector = RemoteObjectDetector(
|
||||
self.config.name,
|
||||
self.labelmap,
|
||||
self.detection_queue,
|
||||
self.model_config,
|
||||
self.stop_event,
|
||||
)
|
||||
|
||||
object_tracker = NorfairTracker(self.config, self.ptz_metrics)
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
|
||||
# create communication for region grid updates
|
||||
requestor = InterProcessRequestor()
|
||||
|
||||
process_frames(
|
||||
requestor,
|
||||
frame_queue,
|
||||
frame_shape,
|
||||
self.model_config,
|
||||
self.config,
|
||||
frame_manager,
|
||||
motion_detector,
|
||||
object_detector,
|
||||
object_tracker,
|
||||
self.detected_objects_queue,
|
||||
self.camera_metrics,
|
||||
self.stop_event,
|
||||
self.ptz_metrics,
|
||||
self.region_grid,
|
||||
)
|
||||
|
||||
# empty the frame queue
|
||||
logger.info(f"{self.config.name}: emptying frame queue")
|
||||
while not frame_queue.empty():
|
||||
(frame_name, _) = frame_queue.get(False)
|
||||
frame_manager.delete(frame_name)
|
||||
|
||||
logger.info(f"{self.config.name}: exiting subprocess")
|
||||
|
||||
|
||||
def detect(
|
||||
@ -577,7 +574,6 @@ def detect(
|
||||
|
||||
|
||||
def process_frames(
|
||||
camera_name: str,
|
||||
requestor: InterProcessRequestor,
|
||||
frame_queue: Queue,
|
||||
frame_shape: tuple[int, int],
|
||||
@ -597,7 +593,7 @@ def process_frames(
|
||||
next_region_update = get_tomorrow_at_time(2)
|
||||
config_subscriber = CameraConfigUpdateSubscriber(
|
||||
None,
|
||||
{camera_name: camera_config},
|
||||
{camera_config.name: camera_config},
|
||||
[
|
||||
CameraConfigUpdateEnum.detect,
|
||||
CameraConfigUpdateEnum.enabled,
|
||||
@ -653,7 +649,9 @@ def process_frames(
|
||||
and prev_enabled != camera_enabled
|
||||
and camera_metrics.frame_queue.empty()
|
||||
):
|
||||
logger.debug(f"Camera {camera_name} disabled, clearing tracked objects")
|
||||
logger.debug(
|
||||
f"Camera {camera_config.name} disabled, clearing tracked objects"
|
||||
)
|
||||
prev_enabled = camera_enabled
|
||||
|
||||
# Clear norfair's dictionaries
|
||||
@ -678,7 +676,7 @@ def process_frames(
|
||||
datetime.datetime.now().astimezone(datetime.timezone.utc)
|
||||
> next_region_update
|
||||
):
|
||||
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name)
|
||||
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
|
||||
next_region_update = get_tomorrow_at_time(2)
|
||||
|
||||
try:
|
||||
@ -698,7 +696,9 @@ def process_frames(
|
||||
frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
|
||||
|
||||
if frame is None:
|
||||
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.")
|
||||
logger.debug(
|
||||
f"{camera_config.name}: frame {frame_time} is not in memory store."
|
||||
)
|
||||
continue
|
||||
|
||||
# look for motion if enabled
|
||||
@ -937,7 +937,7 @@ def process_frames(
|
||||
)
|
||||
|
||||
cv2.imwrite(
|
||||
f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg",
|
||||
f"debug/frames/{camera_config.name}-{'{:.6f}'.format(frame_time)}.jpg",
|
||||
bgr_frame,
|
||||
)
|
||||
# add to the queue if not full
|
||||
@ -949,7 +949,7 @@ def process_frames(
|
||||
camera_metrics.process_fps.value = fps_tracker.eps()
|
||||
detected_objects_queue.put(
|
||||
(
|
||||
camera_name,
|
||||
camera_config.name,
|
||||
frame_name,
|
||||
frame_time,
|
||||
detections,
|
||||
|
||||
@ -173,7 +173,7 @@ export default function CameraMetrics({
|
||||
});
|
||||
series[key]["detect"].data.push({
|
||||
x: statsIdx,
|
||||
y: stats.cpu_usages[camStats.pid.toString()].cpu,
|
||||
y: stats.cpu_usages[camStats.pid?.toString()]?.cpu,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user