Finalize process

This commit is contained in:
Nicolas Mowen 2025-06-12 11:52:14 -06:00
parent e8dd382280
commit 5ef4f2d440
4 changed files with 214 additions and 245 deletions

View File

@ -14,7 +14,6 @@ import uvicorn
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
import frigate.util as util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics
@ -42,7 +41,7 @@ from frigate.const import (
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.embeddings import EmbeddingProcess, EmbeddingsContext
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.maintainer import EventProcessor
@ -59,12 +58,12 @@ from frigate.models import (
User,
)
from frigate.object_detection.base import ObjectDetectProcess
from frigate.output.output import output_frames
from frigate.output.output import OutputProcess
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports
from frigate.record.record import manage_recordings
from frigate.record.record import RecordProcess
from frigate.review.review import ReviewProcess
from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
@ -224,11 +223,7 @@ class FrigateApp:
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = util.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config,),
)
recording_process = RecordProcess(self.config)
recording_process.daemon = True
self.recording_process = recording_process
recording_process.start()
@ -255,13 +250,9 @@ class FrigateApp:
):
return
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(
self.config,
self.embeddings_metrics,
),
embedding_process = EmbeddingProcess(
self.config,
self.embeddings_metrics,
)
embedding_process.daemon = True
self.embedding_process = embedding_process
@ -420,12 +411,7 @@ class FrigateApp:
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
)
output_processor.daemon = True
output_processor = OutputProcess(self.config)
self.output_processor = output_processor
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")

View File

@ -3,16 +3,12 @@
import base64
import json
import logging
import multiprocessing as mp
import os
import signal
import threading
from types import FrameType
from typing import Any, Optional, Union
from typing import Any, Union
import regex
from pathvalidate import ValidationError, sanitize_filename
from setproctitle import setproctitle
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.config import FrigateConfig
@ -20,9 +16,9 @@ from frigate.const import CONFIG_DIR, FACE_DIR
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import serialize
from frigate.util.classification import kickoff_model_training
from frigate.util.services import listen
from .maintainer import EmbeddingMaintainer
from .util import ZScoreNormalization
@ -30,25 +26,20 @@ from .util import ZScoreNormalization
logger = logging.getLogger(__name__)
def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None:
stop_event = mp.Event()
class EmbeddingProcess(FrigateProcess):
def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None:
super().__init__(name="frigate.embeddings_manager", daemon=True)
self.config = config
self.metrics = metrics
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:embeddings_manager"
setproctitle("frigate.embeddings_manager")
listen()
maintainer = EmbeddingMaintainer(
config,
metrics,
stop_event,
)
maintainer.start()
def run(self) -> None:
self.pre_run_setup()
maintainer = EmbeddingMaintainer(
self.config,
self.metrics,
self.stop_event,
)
maintainer.start()
class EmbeddingsContext:

View File

@ -2,14 +2,11 @@
import datetime
import logging
import multiprocessing as mp
import os
import shutil
import signal
import threading
from wsgiref.simple_server import make_server
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import (
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
import frigate.util as util
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig
@ -73,189 +71,193 @@ def check_disabled_camera_update(
birdseye.all_cameras_disabled()
def output_frames(
config: FrigateConfig,
):
threading.current_thread().name = "output"
setproctitle("frigate.output")
class OutputProcess(util.Process):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.output", daemon=True)
self.config = config
stop_event = mp.Event()
def run(self) -> None:
self.pre_run_setup()
def receiveSignal(signalNumber, frame):
stop_event.set()
frame_manager = SharedMemoryFrameManager()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Birdseye | None = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
failed_frame_requests: dict[str, int] = {}
last_disabled_cam_check = datetime.datetime.now().timestamp()
move_preview_frames("cache")
for camera, cam_config in config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if config.birdseye.enabled:
birdseye = Birdseye(config, stop_event, websocket_server)
websocket_thread.start()
while not stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
config, birdseye, preview_recorders, preview_write_times
)
if not topic:
continue
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
_,
) = data
if not config.cameras[camera].enabled:
continue
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
if failed_frame_requests[camera] > config.cameras[camera].detect.fps:
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
continue
else:
failed_frame_requests[camera] = 0
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
preview_write_times[camera] = frame_time
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
# send output data to birdseye if websocket is connected or restreaming
if config.birdseye.enabled and (
config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Birdseye | None = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
failed_frame_requests: dict[str, int] = {}
last_disabled_cam_check = datetime.datetime.now().timestamp()
move_preview_frames("cache")
for camera, cam_config in self.config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
)
):
birdseye.write_data(
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if self.config.birdseye.enabled:
birdseye = Birdseye(self.config, self.stop_event, websocket_server)
websocket_thread.start()
while not self.stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
self.config, birdseye, preview_recorders, preview_write_times
)
if not topic:
continue
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
_,
) = data
if not self.config.cameras[camera].enabled:
continue
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
)
frame_manager.close(frame_name)
if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
move_preview_frames("clips")
if (
failed_frame_requests[camera]
> self.config.cameras[camera].detect.fps
):
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
continue
else:
failed_frame_requests[camera] = 0
if not topic:
break
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera)
for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
frame_manager.close(frame_name)
# send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
birdseye.write_data(
camera,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
)
detection_subscriber.stop()
frame_manager.close(frame_name)
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
move_preview_frames("clips")
for preview in preview_recorders.values():
preview.stop()
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
if birdseye is not None:
birdseye.stop()
if not topic:
break
config_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
)
frame_manager.close(frame_name)
detection_subscriber.stop()
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
for preview in preview_recorders.values():
preview.stop()
if birdseye is not None:
birdseye.stop()
config_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
def move_preview_frames(loc: str):

View File

@ -1,50 +1,40 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.models import Recordings, ReviewSegment
from frigate.record.maintainer import RecordingMaintainer
from frigate.util.services import listen
from frigate.util import Process as FrigateProcess
logger = logging.getLogger(__name__)
def manage_recordings(config: FrigateConfig) -> None:
stop_event = mp.Event()
class RecordProcess(FrigateProcess):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.recording_manager", daemon=True)
self.config = config
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
def run(self) -> None:
self.pre_run_setup()
db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
)
models = [ReviewSegment, Recordings]
db.bind(models)
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:recording_manager"
setproctitle("frigate.recording_manager")
listen()
db = SqliteQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
)
models = [ReviewSegment, Recordings]
db.bind(models)
maintainer = RecordingMaintainer(
config,
stop_event,
)
maintainer.start()
maintainer = RecordingMaintainer(
self.config,
self.stop_event,
)
maintainer.start()