Compare commits

...

4 Commits

Author SHA1 Message Date
Nicolas Mowen
9fdce80729
Handle case when no classification model exists (#20257) 2025-09-28 16:03:44 -05:00
Josh Hawkins
12f8c3feac
Watchdog enhancements (#20237)
* refactor get_video_properties and use json output from ffprobe

* add zmq topic

* publish valid segment data in recording maintainer

* check for valid video data

- restart separate record ffmpeg process if no video data has been received in 120s
- refactor datetime import

* listen to correct topic in embeddings maintainer

* refactor to move get_latest_segment_datetime logic to recordings maintainer

* debug logging

* cleanup
2025-09-28 10:52:14 -06:00
Josh Hawkins
b6552987b0
Fixes (#20254)
* fix api async/await functions

* fix synaptics detector from throwing error when unused

* clean up
2025-09-28 07:08:52 -06:00
Nicolas Mowen
c207009d8a
Refactor AMD GPU support (#20239)
* Update ROCm to 7.0.1

* Update ONNXRuntime

* Add back in

* Get basic detection working

* Use env vars

* Handle complex migraphx models

* Enable model caching

* Remove unused

* Add tip to docs
2025-09-27 14:43:11 -05:00
14 changed files with 424 additions and 213 deletions

View File

@ -15,14 +15,14 @@ ARG AMDGPU
RUN apt update -qq && \
apt install -y wget gpg && \
wget -O rocm.deb https://repo.radeon.com/amdgpu-install/6.4.1/ubuntu/jammy/amdgpu-install_6.4.60401-1_all.deb && \
wget -O rocm.deb https://repo.radeon.com/amdgpu-install/7.0.1/ubuntu/jammy/amdgpu-install_7.0.1.70001-1_all.deb && \
apt install -y ./rocm.deb && \
apt update && \
apt install -qq -y rocm
RUN mkdir -p /opt/rocm-dist/opt/rocm-$ROCM/lib
RUN cd /opt/rocm-$ROCM/lib && \
cp -dpr libMIOpen*.so* libamd*.so* libhip*.so* libhsa*.so* libmigraphx*.so* librocm*.so* librocblas*.so* libroctracer*.so* librocsolver*.so* librocfft*.so* librocprofiler*.so* libroctx*.so* /opt/rocm-dist/opt/rocm-$ROCM/lib/ && \
cp -dpr libMIOpen*.so* libamd*.so* libhip*.so* libhsa*.so* libmigraphx*.so* librocm*.so* librocblas*.so* libroctracer*.so* librocsolver*.so* librocfft*.so* librocprofiler*.so* libroctx*.so* librocroller.so* /opt/rocm-dist/opt/rocm-$ROCM/lib/ && \
mkdir -p /opt/rocm-dist/opt/rocm-$ROCM/lib/migraphx/lib && \
cp -dpr migraphx/lib/* /opt/rocm-dist/opt/rocm-$ROCM/lib/migraphx/lib
RUN cd /opt/rocm-dist/opt/ && ln -s rocm-$ROCM rocm
@ -64,11 +64,10 @@ COPY --from=rocm /opt/rocm-dist/ /
#######################################################################
FROM deps-prelim AS rocm-prelim-hsa-override0
ENV HSA_ENABLE_SDMA=0
ENV TF_ROCM_USE_IMMEDIATE_MODE=1
# avoid kernel crashes
ENV HIP_FORCE_DEV_KERNARG=1
ENV MIGRAPHX_DISABLE_MIOPEN_FUSION=1
ENV MIGRAPHX_DISABLE_SCHEDULE_PASS=1
ENV MIGRAPHX_DISABLE_REDUCE_FUSION=1
ENV MIGRAPHX_ENABLE_HIPRTC_WORKAROUNDS=1
COPY --from=rocm-dist / /

View File

@ -1 +1 @@
onnxruntime-rocm @ https://github.com/NickM-27/frigate-onnxruntime-rocm/releases/download/v6.4.1/onnxruntime_rocm-1.21.1-cp311-cp311-linux_x86_64.whl
onnxruntime-migraphx @ https://github.com/NickM-27/frigate-onnxruntime-rocm/releases/download/v7.0.1/onnxruntime_migraphx-1.23.0-cp311-cp311-linux_x86_64.whl

View File

@ -2,7 +2,7 @@ variable "AMDGPU" {
default = "gfx900"
}
variable "ROCM" {
default = "6.4.1"
default = "7.0.1"
}
variable "HSA_OVERRIDE_GFX_VERSION" {
default = ""

View File

@ -555,6 +555,17 @@ $ docker exec -it frigate /bin/bash -c '(unset HSA_OVERRIDE_GFX_VERSION && /opt/
### ROCm Supported Models
:::tip
The AMD GPU kernel is known problematic especially when converting models to mxr format. The recommended approach is:
1. Disable object detection in the config.
2. Startup Frigate with the onnx detector configured, the main object detection model will be converted to mxr format and cached in the config directory.
3. Once this is finished as indicated by the logs, enable object detection in the UI and confirm that it is working correctly.
4. Re-enable object detection in the config.
:::
See [ONNX supported models](#supported-models) for supported models, there are some caveats:
- D-FINE models are not supported
@ -781,19 +792,19 @@ To verify that the integration is working correctly, start Frigate and observe t
# Community Supported Detectors
## MemryX MX3
## MemryX MX3
This detector is available for use with the MemryX MX3 accelerator M.2 module. Frigate supports the MX3 on compatible hardware platforms, providing efficient and high-performance object detection.
This detector is available for use with the MemryX MX3 accelerator M.2 module. Frigate supports the MX3 on compatible hardware platforms, providing efficient and high-performance object detection.
See the [installation docs](../frigate/installation.md#memryx-mx3) for information on configuring the MemryX hardware.
To configure a MemryX detector, simply set the `type` attribute to `memryx` and follow the configuration guide below.
### Configuration
### Configuration
To configure the MemryX detector, use the following example configuration:
To configure the MemryX detector, use the following example configuration:
#### Single PCIe MemryX MX3
#### Single PCIe MemryX MX3
```yaml
detectors:
@ -819,7 +830,7 @@ detectors:
device: PCIe:2
```
### Supported Models
### Supported Models
MemryX `.dfp` models are automatically downloaded at runtime, if enabled, to the container at `/memryx_models/model_folder/`.
@ -833,9 +844,9 @@ The input size for **YOLO-NAS** can be set to either **320x320** (default) or **
- The default size of **320x320** is optimized for lower CPU usage and faster inference times.
##### Configuration
##### Configuration
Below is the recommended configuration for using the **YOLO-NAS** (small) model with the MemryX detector:
Below is the recommended configuration for using the **YOLO-NAS** (small) model with the MemryX detector:
```yaml
detectors:
@ -857,13 +868,13 @@ model:
# └── yolonas_post.onnx (optional; only if the model includes a cropped post-processing network)
```
#### YOLOv9
#### YOLOv9
The YOLOv9s model included in this detector is downloaded from [the original GitHub](https://github.com/WongKinYiu/yolov9) like in the [Models Section](#yolov9-1) and compiled to DFP with [mx_nc](https://developer.memryx.com/tools/neural_compiler.html#usage).
##### Configuration
Below is the recommended configuration for using the **YOLOv9** (small) model with the MemryX detector:
Below is the recommended configuration for using the **YOLOv9** (small) model with the MemryX detector:
```yaml
detectors:
@ -872,7 +883,7 @@ detectors:
device: PCIe:0
model:
model_type: yolo-generic
model_type: yolo-generic
width: 320 # (Can be set to 640 for higher resolution)
height: 320 # (Can be set to 640 for higher resolution)
input_tensor: nchw
@ -885,13 +896,13 @@ model:
# └── yolov9_post.onnx (optional; only if the model includes a cropped post-processing network)
```
#### YOLOX
#### YOLOX
The model is sourced from the [OpenCV Model Zoo](https://github.com/opencv/opencv_zoo) and precompiled to DFP.
##### Configuration
##### Configuration
Below is the recommended configuration for using the **YOLOX** (small) model with the MemryX detector:
Below is the recommended configuration for using the **YOLOX** (small) model with the MemryX detector:
```yaml
detectors:
@ -912,13 +923,13 @@ model:
# ├── yolox.dfp (a file ending with .dfp)
```
#### SSDLite MobileNet v2
#### SSDLite MobileNet v2
The model is sourced from the [OpenMMLab Model Zoo](https://mmdeploy-oss.openmmlab.com/model/mmdet-det/ssdlite-e8679f.onnx) and has been converted to DFP.
##### Configuration
##### Configuration
Below is the recommended configuration for using the **SSDLite MobileNet v2** model with the MemryX detector:
Below is the recommended configuration for using the **SSDLite MobileNet v2** model with the MemryX detector:
```yaml
detectors:

View File

@ -822,9 +822,9 @@ async def vod_ts(camera_name: str, start_ts: float, end_ts: float):
dependencies=[Depends(require_camera_access)],
description="Returns an HLS playlist for the specified date-time on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
)
def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str):
async def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str):
"""VOD for specific hour. Uses the default timezone (UTC)."""
return vod_hour(
return await vod_hour(
year_month, day, hour, camera_name, get_localzone_name().replace("/", ",")
)
@ -834,7 +834,9 @@ def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str)
dependencies=[Depends(require_camera_access)],
description="Returns an HLS playlist for the specified date-time (with timezone) on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
)
def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: str):
async def vod_hour(
year_month: str, day: int, hour: int, camera_name: str, tz_name: str
):
parts = year_month.split("-")
start_date = (
datetime(int(parts[0]), int(parts[1]), day, hour, tzinfo=timezone.utc)
@ -844,7 +846,7 @@ def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: st
start_ts = start_date.timestamp()
end_ts = end_date.timestamp()
return vod_ts(camera_name, start_ts, end_ts)
return await vod_ts(camera_name, start_ts, end_ts)
@router.get(
@ -875,7 +877,7 @@ async def vod_event(
if event.end_time is None
else (event.end_time + padding)
)
vod_response = vod_ts(event.camera, event.start_time - padding, end_ts)
vod_response = await vod_ts(event.camera, event.start_time - padding, end_ts)
# If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false
if (
@ -1248,7 +1250,7 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
@router.get("/events/{event_id}/clip.mp4")
def event_clip(
async def event_clip(
request: Request,
event_id: str,
padding: int = Query(0, description="Padding to apply to clip."),
@ -1270,7 +1272,9 @@ def event_clip(
if event.end_time is None
else event.end_time + padding
)
return recording_clip(request, event.camera, event.start_time - padding, end_ts)
return await recording_clip(
request, event.camera, event.start_time - padding, end_ts
)
@router.get("/events/{event_id}/preview.gif")
@ -1698,7 +1702,7 @@ def preview_thumbnail(file_name: str):
"/{camera_name}/{label}/thumbnail.jpg",
dependencies=[Depends(require_camera_access)],
)
def label_thumbnail(request: Request, camera_name: str, label: str):
async def label_thumbnail(request: Request, camera_name: str, label: str):
label = unquote(label)
event_query = Event.select(fn.MAX(Event.id)).where(Event.camera == camera_name)
if label != "any":
@ -1707,7 +1711,7 @@ def label_thumbnail(request: Request, camera_name: str, label: str):
try:
event_id = event_query.scalar()
return event_thumbnail(request, event_id, Extension.jpg, 60)
return await event_thumbnail(request, event_id, Extension.jpg, 60)
except DoesNotExist:
frame = np.zeros((175, 175, 3), np.uint8)
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
@ -1722,7 +1726,7 @@ def label_thumbnail(request: Request, camera_name: str, label: str):
@router.get(
"/{camera_name}/{label}/clip.mp4", dependencies=[Depends(require_camera_access)]
)
def label_clip(request: Request, camera_name: str, label: str):
async def label_clip(request: Request, camera_name: str, label: str):
label = unquote(label)
event_query = Event.select(fn.MAX(Event.id)).where(
Event.camera == camera_name, Event.has_clip == True
@ -1733,7 +1737,7 @@ def label_clip(request: Request, camera_name: str, label: str):
try:
event = event_query.get()
return event_clip(request, event.id)
return await event_clip(request, event.id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Event not found"}, status_code=404
@ -1743,7 +1747,7 @@ def label_clip(request: Request, camera_name: str, label: str):
@router.get(
"/{camera_name}/{label}/snapshot.jpg", dependencies=[Depends(require_camera_access)]
)
def label_snapshot(request: Request, camera_name: str, label: str):
async def label_snapshot(request: Request, camera_name: str, label: str):
"""Returns the snapshot image from the latest event for the given camera and label combo"""
label = unquote(label)
if label == "any":
@ -1764,7 +1768,7 @@ def label_snapshot(request: Request, camera_name: str, label: str):
try:
event: Event = event_query.get()
return event_snapshot(request, event.id, MediaEventsSnapshotQueryParams())
return await event_snapshot(request, event.id, MediaEventsSnapshotQueryParams())
except DoesNotExist:
frame = np.zeros((720, 1280, 3), np.uint8)
_, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])

View File

@ -2,6 +2,7 @@
import logging
from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber
@ -10,18 +11,21 @@ logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum):
all = ""
recordings_available_through = "recordings_available_through"
saved = "saved" # segment has been saved to db
latest = "latest" # segment is in cache
valid = "valid" # segment is valid
invalid = "invalid" # segment is invalid
class RecordingsDataPublisher(Publisher[tuple[str, float]]):
class RecordingsDataPublisher(Publisher[Any]):
"""Publishes latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def __init__(self) -> None:
super().__init__()
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
def publish(self, payload: Any, sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
@ -32,3 +36,11 @@ class RecordingsDataSubscriber(Subscriber):
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None:
return (None, None)
return (topic, payload)

View File

@ -48,9 +48,9 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
self.requestor = requestor
self.model_dir = os.path.join(MODEL_CACHE_DIR, self.model_config.name)
self.train_dir = os.path.join(CLIPS_DIR, self.model_config.name, "train")
self.interpreter: Interpreter = None
self.tensor_input_details: dict[str, Any] = None
self.tensor_output_details: dict[str, Any] = None
self.interpreter: Interpreter | None = None
self.tensor_input_details: dict[str, Any] | None = None
self.tensor_output_details: dict[str, Any] | None = None
self.labelmap: dict[int, str] = {}
self.classifications_per_second = EventsPerSecond()
self.inference_speed = InferenceSpeed(
@ -61,17 +61,24 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
@redirect_output_to_logger(logger, logging.DEBUG)
def __build_detector(self) -> None:
model_path = os.path.join(self.model_dir, "model.tflite")
labelmap_path = os.path.join(self.model_dir, "labelmap.txt")
if not os.path.exists(model_path) or not os.path.exists(labelmap_path):
self.interpreter = None
self.tensor_input_details = None
self.tensor_output_details = None
self.labelmap = {}
return
self.interpreter = Interpreter(
model_path=os.path.join(self.model_dir, "model.tflite"),
model_path=model_path,
num_threads=2,
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
self.labelmap = load_labels(
os.path.join(self.model_dir, "labelmap.txt"),
prefill=0,
)
self.labelmap = load_labels(labelmap_path, prefill=0)
self.classifications_per_second.start()
def __update_metrics(self, duration: float) -> None:
@ -140,6 +147,16 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
logger.warning("Failed to resize image for state classification")
return
if self.interpreter is None:
write_classification_attempt(
self.train_dir,
cv2.cvtColor(frame, cv2.COLOR_RGB2BGR),
now,
"unknown",
0.0,
)
return
input = np.expand_dims(frame, axis=0)
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
self.interpreter.invoke()
@ -197,10 +214,10 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi):
self.model_config = model_config
self.model_dir = os.path.join(MODEL_CACHE_DIR, self.model_config.name)
self.train_dir = os.path.join(CLIPS_DIR, self.model_config.name, "train")
self.interpreter: Interpreter = None
self.interpreter: Interpreter | None = None
self.sub_label_publisher = sub_label_publisher
self.tensor_input_details: dict[str, Any] = None
self.tensor_output_details: dict[str, Any] = None
self.tensor_input_details: dict[str, Any] | None = None
self.tensor_output_details: dict[str, Any] | None = None
self.detected_objects: dict[str, float] = {}
self.labelmap: dict[int, str] = {}
self.classifications_per_second = EventsPerSecond()
@ -211,17 +228,24 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi):
@redirect_output_to_logger(logger, logging.DEBUG)
def __build_detector(self) -> None:
model_path = os.path.join(self.model_dir, "model.tflite")
labelmap_path = os.path.join(self.model_dir, "labelmap.txt")
if not os.path.exists(model_path) or not os.path.exists(labelmap_path):
self.interpreter = None
self.tensor_input_details = None
self.tensor_output_details = None
self.labelmap = {}
return
self.interpreter = Interpreter(
model_path=os.path.join(self.model_dir, "model.tflite"),
model_path=model_path,
num_threads=2,
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
self.labelmap = load_labels(
os.path.join(self.model_dir, "labelmap.txt"),
prefill=0,
)
self.labelmap = load_labels(labelmap_path, prefill=0)
def __update_metrics(self, duration: float) -> None:
self.classifications_per_second.update()
@ -265,6 +289,16 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi):
logger.warning("Failed to resize image for state classification")
return
if self.interpreter is None:
write_classification_attempt(
self.train_dir,
cv2.cvtColor(crop, cv2.COLOR_RGB2BGR),
now,
"unknown",
0.0,
)
return
input = np.expand_dims(crop, axis=0)
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
self.interpreter.invoke()

View File

@ -78,6 +78,21 @@ class BaseModelRunner(ABC):
class ONNXModelRunner(BaseModelRunner):
"""Run ONNX models using ONNX Runtime."""
@staticmethod
def is_migraphx_complex_model(model_type: str) -> bool:
# Import here to avoid circular imports
from frigate.detectors.detector_config import ModelTypeEnum
from frigate.embeddings.types import EnrichmentModelTypeEnum
return model_type in [
EnrichmentModelTypeEnum.paddleocr.value,
EnrichmentModelTypeEnum.jina_v1.value,
EnrichmentModelTypeEnum.jina_v2.value,
EnrichmentModelTypeEnum.facenet.value,
ModelTypeEnum.rfdetr.value,
ModelTypeEnum.dfine.value,
]
def __init__(self, ort: ort.InferenceSession):
self.ort = ort
@ -441,6 +456,15 @@ def get_optimized_runner(
options[0]["device_id"],
)
if (
providers
and providers[0] == "MIGraphXExecutionProvider"
and ONNXModelRunner.is_migraphx_complex_model(model_type)
):
# Don't use MIGraphX for models that are not supported
providers.pop(0)
options.pop(0)
return ONNXModelRunner(
ort.InferenceSession(
model_path,

View File

@ -2,10 +2,6 @@ import logging
import os
import numpy as np
from synap import Network
from synap.postprocessor import Detector
from synap.preprocessor import Preprocessor
from synap.types import Layout, Shape
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
@ -15,6 +11,16 @@ from frigate.detectors.detector_config import (
ModelTypeEnum,
)
try:
from synap import Network
from synap.postprocessor import Detector
from synap.preprocessor import Preprocessor
from synap.types import Layout, Shape
SYNAP_SUPPORT = True
except ImportError:
SYNAP_SUPPORT = False
logger = logging.getLogger(__name__)
DETECTOR_KEY = "synaptics"
@ -28,15 +34,21 @@ class SynapDetector(DetectionApi):
type_key = DETECTOR_KEY
def __init__(self, detector_config: SynapDetectorConfig):
if not SYNAP_SUPPORT:
logger.error(
"Error importing Synaptics SDK modules. You must use the -synaptics Docker image variant for Synaptics detector support."
)
return
try:
_, ext = os.path.splitext(detector_config.model.path)
if ext and ext != ".synap":
raise ValueError("Model path config for Synap1680 is wrong.")
raise ValueError("Model path config for Synap1680 is incorrect.")
synap_network = Network(detector_config.model.path)
logger.info(f"Synap NPU loaded model: {detector_config.model.path}")
except ValueError as ve:
logger.error(f"Config to Synap1680 was Failed: {ve}")
logger.error(f"Synap1680 setup has failed: {ve}")
raise
except Exception as e:
logger.error(f"Failed to init Synap NPU: {e}")

View File

@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread):
EventMetadataTypeEnum.regenerate_description
)
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through
RecordingsDataTypeEnum.saved
)
self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
@ -525,20 +525,28 @@ class EmbeddingMaintainer(threading.Thread):
def _process_recordings_updates(self) -> None:
"""Process recordings updates."""
while True:
recordings_data = self.recordings_subscriber.check_for_update()
update = self.recordings_subscriber.check_for_update()
if recordings_data == None:
if not update:
break
camera, recordings_available_through_timestamp = recordings_data
(raw_topic, payload) = update
self.recordings_available_through[camera] = (
recordings_available_through_timestamp
)
if not raw_topic or not payload:
break
logger.debug(
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
topic = str(raw_topic)
if topic.endswith(RecordingsDataTypeEnum.saved.value):
camera, recordings_available_through_timestamp, _ = payload
self.recordings_available_through[camera] = (
recordings_available_through_timestamp
)
logger.debug(
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
def _process_review_updates(self) -> None:
"""Process review updates."""

View File

@ -80,9 +80,7 @@ class RecordingMaintainer(threading.Thread):
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
)
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
self.recordings_publisher = RecordingsDataPublisher(
RecordingsDataTypeEnum.recordings_available_through
)
self.recordings_publisher = RecordingsDataPublisher()
self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list)
@ -98,6 +96,41 @@ class RecordingMaintainer(threading.Thread):
and not d.startswith("preview_")
]
# publish newest cached segment per camera (including in use files)
newest_cache_segments: dict[str, dict[str, Any]] = {}
for cache in cache_files:
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("@", maxsplit=1)
start_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if (
camera not in newest_cache_segments
or start_time > newest_cache_segments[camera]["start_time"]
):
newest_cache_segments[camera] = {
"start_time": start_time,
"cache_path": cache_path,
}
for camera, newest in newest_cache_segments.items():
self.recordings_publisher.publish(
(
camera,
newest["start_time"].timestamp(),
newest["cache_path"],
),
RecordingsDataTypeEnum.latest.value,
)
# publish None for cameras with no cache files (but only if we know the camera exists)
for camera_name in self.config.cameras:
if camera_name not in newest_cache_segments:
self.recordings_publisher.publish(
(camera_name, None, None),
RecordingsDataTypeEnum.latest.value,
)
files_in_use = []
for process in psutil.process_iter():
try:
@ -111,7 +144,7 @@ class RecordingMaintainer(threading.Thread):
except psutil.Error:
continue
# group recordings by camera
# group recordings by camera (skip in-use for validation/moving)
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files:
# Skip files currently in use
@ -233,7 +266,9 @@ class RecordingMaintainer(threading.Thread):
recordings[0]["start_time"].timestamp()
if self.config.cameras[camera].record.enabled
else None,
)
None,
),
RecordingsDataTypeEnum.saved.value,
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
@ -250,7 +285,7 @@ class RecordingMaintainer(threading.Thread):
async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
) -> None:
) -> Optional[Recordings]:
cache_path: str = recording["cache_path"]
start_time: datetime.datetime = recording["start_time"]
record_config = self.config.cameras[camera].record
@ -261,7 +296,7 @@ class RecordingMaintainer(threading.Thread):
or not self.config.cameras[camera].record.enabled
):
self.drop_segment(cache_path)
return
return None
if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path]
@ -270,10 +305,18 @@ class RecordingMaintainer(threading.Thread):
self.config.ffmpeg, cache_path, get_duration=True
)
if segment_info["duration"]:
duration = float(segment_info["duration"])
else:
duration = -1
if not segment_info.get("has_valid_video", False):
logger.warning(
f"Invalid or missing video stream in segment {cache_path}. Discarding."
)
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path)
return None
duration = float(segment_info.get("duration", -1))
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION:
@ -284,8 +327,18 @@ class RecordingMaintainer(threading.Thread):
logger.warning(f"Failed to probe corrupt segment {cache_path}")
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
Path(cache_path).unlink(missing_ok=True)
return
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path)
return None
# this segment has a valid duration and has video data, so publish an update
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.valid.value,
)
record_config = self.config.cameras[camera].record
highest = None

View File

@ -284,7 +284,9 @@ def post_process_yolox(
def get_ort_providers(
force_cpu: bool = False, device: str | None = "AUTO", requires_fp16: bool = False
force_cpu: bool = False,
device: str | None = "AUTO",
requires_fp16: bool = False,
) -> tuple[list[str], list[dict[str, Any]]]:
if force_cpu:
return (
@ -351,12 +353,15 @@ def get_ort_providers(
}
)
elif provider == "MIGraphXExecutionProvider":
# MIGraphX uses more CPU than ROCM, while also being the same speed
if device == "MIGraphX":
providers.append(provider)
options.append({})
else:
continue
migraphx_cache_dir = os.path.join(MODEL_CACHE_DIR, "migraphx")
os.makedirs(migraphx_cache_dir, exist_ok=True)
providers.append(provider)
options.append(
{
"migraphx_model_cache_dir": migraphx_cache_dir,
}
)
elif provider == "CPUExecutionProvider":
providers.append(provider)
options.append(

View File

@ -603,87 +603,87 @@ def auto_detect_hwaccel() -> str:
async def get_video_properties(
ffmpeg, url: str, get_duration: bool = False
) -> dict[str, Any]:
async def calculate_duration(video: Optional[Any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
p = await asyncio.create_subprocess_exec(
ffmpeg.ffprobe_path,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
async def probe_with_ffprobe(
url: str,
) -> tuple[bool, int, int, Optional[str], float]:
"""Fallback using ffprobe: returns (valid, width, height, codec, duration)."""
cmd = [
ffmpeg.ffprobe_path,
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await p.wait()
stdout, _ = await proc.communicate()
if proc.returncode != 0:
return False, 0, 0, None, -1
if p.returncode == 0:
result = (await p.stdout.read()).decode()
else:
result = None
data = json.loads(stdout.decode())
video_streams = [
s for s in data.get("streams", []) if s.get("codec_type") == "video"
]
if not video_streams:
return False, 0, 0, None, -1
if result:
try:
duration = float(result.strip())
except ValueError:
duration = -1
else:
duration = -1
v = video_streams[0]
width = int(v.get("width", 0))
height = int(v.get("height", 0))
codec = v.get("codec_name")
return duration
duration_str = data.get("format", {}).get("duration")
duration = float(duration_str) if duration_str else -1.0
width = height = 0
return True, width, height, codec, duration
except (json.JSONDecodeError, ValueError, KeyError, asyncio.SubprocessError):
return False, 0, 0, None, -1
try:
# Open the video stream using OpenCV
video = cv2.VideoCapture(url)
def probe_with_cv2(url: str) -> tuple[bool, int, int, Optional[str], float]:
"""Primary attempt using cv2: returns (valid, width, height, fourcc, duration)."""
cap = cv2.VideoCapture(url)
if not cap.isOpened():
cap.release()
return False, 0, 0, None, -1
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
valid = width > 0 and height > 0
fourcc = None
duration = -1.0
result = {}
if valid:
fourcc_int = int(cap.get(cv2.CAP_PROP_FOURCC))
fourcc = fourcc_int.to_bytes(4, "little").decode("latin-1").strip()
if get_duration:
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps > 0 and total_frames > 0:
duration = total_frames / fps
cap.release()
return valid, width, height, fourcc, duration
# try cv2 first
has_video, width, height, fourcc, duration = probe_with_cv2(url)
# fallback to ffprobe if needed
if not has_video or (get_duration and duration < 0):
has_video, width, height, fourcc, duration = await probe_with_ffprobe(url)
result: dict[str, Any] = {"has_valid_video": has_video}
if has_video:
result.update({"width": width, "height": height})
if fourcc:
result["fourcc"] = fourcc
if get_duration:
result["duration"] = await calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Get the stream encoding
fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC))
fourcc = (
chr((fourcc_int >> 0) & 255)
+ chr((fourcc_int >> 8) & 255)
+ chr((fourcc_int >> 16) & 255)
+ chr((fourcc_int >> 24) & 255)
)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
result["fourcc"] = fourcc
result["duration"] = duration
return result

View File

@ -1,10 +1,9 @@
import datetime
import logging
import os
import queue
import subprocess as sp
import threading
import time
from datetime import datetime, timedelta, timezone
from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
@ -13,6 +12,10 @@ import cv2
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.config import CameraConfig, DetectConfig, ModelConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
@ -20,8 +23,6 @@ from frigate.config.camera.updater import (
CameraConfigUpdateSubscriber,
)
from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
PROCESS_PRIORITY_HIGH,
REQUEST_REGION_GRID,
)
@ -129,7 +130,7 @@ def capture_frames(
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp()
current_frame.value = datetime.now().timestamp()
frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name)
try:
@ -199,6 +200,11 @@ class CameraWatchdog(threading.Thread):
self.requestor = InterProcessRequestor()
self.was_enabled = self.config.enabled
self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
self.latest_valid_segment_time: float = 0
self.latest_invalid_segment_time: float = 0
self.latest_cache_segment_time: float = 0
def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state."""
self.config_subscriber.check_for_updates()
@ -243,6 +249,11 @@ class CameraWatchdog(threading.Thread):
if enabled:
self.logger.debug(f"Enabling camera {self.config.name}")
self.start_all_ffmpeg()
# reset all timestamps
self.latest_valid_segment_time = 0
self.latest_invalid_segment_time = 0
self.latest_cache_segment_time = 0
else:
self.logger.debug(f"Disabling camera {self.config.name}")
self.stop_all_ffmpeg()
@ -260,7 +271,37 @@ class CameraWatchdog(threading.Thread):
if not enabled:
continue
now = datetime.datetime.now().timestamp()
while True:
update = self.segment_subscriber.check_for_update(timeout=0)
if update == (None, None):
break
raw_topic, payload = update
if raw_topic and payload:
topic = str(raw_topic)
camera, segment_time, _ = payload
if camera != self.config.name:
continue
if topic.endswith(RecordingsDataTypeEnum.valid.value):
self.logger.debug(
f"Latest valid recording segment time on {camera}: {segment_time}"
)
self.latest_valid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
self.logger.warning(
f"Invalid recording segment detected for {camera} at {segment_time}"
)
self.latest_invalid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.latest.value):
if segment_time is not None:
self.latest_cache_segment_time = segment_time
else:
self.latest_cache_segment_time = 0
now = datetime.now().timestamp()
if not self.capture_thread.is_alive():
self.requestor.send_data(f"{self.config.name}/status/detect", "offline")
@ -298,18 +339,55 @@ class CameraWatchdog(threading.Thread):
poll = p["process"].poll()
if self.config.record.enabled and "record" in p["roles"]:
latest_segment_time = self.get_latest_segment_datetime(
p.get(
"latest_segment_time",
datetime.datetime.now().astimezone(datetime.timezone.utc),
now_utc = datetime.now().astimezone(timezone.utc)
latest_cache_dt = (
datetime.fromtimestamp(
self.latest_cache_segment_time, tz=timezone.utc
)
if self.latest_cache_segment_time > 0
else now_utc - timedelta(seconds=1)
)
if datetime.datetime.now().astimezone(datetime.timezone.utc) > (
latest_segment_time + datetime.timedelta(seconds=120)
):
latest_valid_dt = (
datetime.fromtimestamp(
self.latest_valid_segment_time, tz=timezone.utc
)
if self.latest_valid_segment_time > 0
else now_utc - timedelta(seconds=1)
)
latest_invalid_dt = (
datetime.fromtimestamp(
self.latest_invalid_segment_time, tz=timezone.utc
)
if self.latest_invalid_segment_time > 0
else now_utc - timedelta(seconds=1)
)
# ensure segments are still being created and that they have valid video data
cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120))
valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120))
invalid_stale_condition = (
self.latest_invalid_segment_time > 0
and now_utc > (latest_invalid_dt + timedelta(seconds=120))
and self.latest_valid_segment_time
<= self.latest_invalid_segment_time
)
invalid_stale = invalid_stale_condition
if cache_stale or valid_stale or invalid_stale:
if cache_stale:
reason = "No new recording segments were created"
elif valid_stale:
reason = "No new valid recording segments were created"
else: # invalid_stale
reason = (
"No valid segments created since last invalid segment"
)
self.logger.error(
f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..."
f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..."
)
p["process"] = start_or_restart_ffmpeg(
p["cmd"],
@ -328,7 +406,7 @@ class CameraWatchdog(threading.Thread):
self.requestor.send_data(
f"{self.config.name}/status/record", "online"
)
p["latest_segment_time"] = latest_segment_time
p["latest_segment_time"] = self.latest_cache_segment_time
if poll is None:
continue
@ -346,6 +424,7 @@ class CameraWatchdog(threading.Thread):
self.stop_all_ffmpeg()
self.logpipe.close()
self.config_subscriber.stop()
self.segment_subscriber.stop()
def start_ffmpeg_detect(self):
ffmpeg_cmd = [
@ -405,33 +484,6 @@ class CameraWatchdog(threading.Thread):
p["logpipe"].close()
self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime(
self, latest_segment: datetime.datetime
) -> datetime.datetime:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("preview_")
]
)
newest_segment_time = latest_segment
for file in cache_files:
if self.config.name in file:
basename = os.path.splitext(file)[0]
_, date = basename.rsplit("@", maxsplit=1)
segment_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if segment_time > newest_segment_time:
newest_segment_time = segment_time
return newest_segment_time
class CameraCaptureRunner(threading.Thread):
def __init__(
@ -727,10 +779,7 @@ def process_frames(
time.sleep(0.1)
continue
if (
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
if datetime.now().astimezone(timezone.utc) > next_region_update:
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
next_region_update = get_tomorrow_at_time(2)