This commit is contained in:
154
unifi/cams/frigate.py
Normal file
154
unifi/cams/frigate.py
Normal file
@ -0,0 +1,154 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import backoff
|
||||
from asyncio_mqtt import Client
|
||||
from asyncio_mqtt.error import MqttError
|
||||
|
||||
from unifi.cams.base import SmartDetectObjectType
|
||||
from unifi.cams.rtsp import RTSPCam
|
||||
|
||||
|
||||
class FrigateCam(RTSPCam):
|
||||
def __init__(self, args: argparse.Namespace, logger: logging.Logger) -> None:
|
||||
super().__init__(args, logger)
|
||||
self.args = args
|
||||
self.event_id: Optional[str] = None
|
||||
self.event_label: Optional[str] = None
|
||||
self.event_snapshot_ready = None
|
||||
|
||||
@classmethod
|
||||
def add_parser(cls, parser: argparse.ArgumentParser) -> None:
|
||||
super().add_parser(parser)
|
||||
parser.add_argument("--mqtt-host", required=True, help="MQTT server")
|
||||
parser.add_argument("--mqtt-port", default=1883, type=int, help="MQTT server")
|
||||
parser.add_argument("--mqtt-username", required=False)
|
||||
parser.add_argument("--mqtt-password", required=False)
|
||||
parser.add_argument(
|
||||
"--mqtt-prefix", default="frigate", type=str, help="Topic prefix"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frigate-camera",
|
||||
required=True,
|
||||
type=str,
|
||||
help="Name of camera in frigate",
|
||||
)
|
||||
|
||||
async def get_feature_flags(self) -> dict[str, Any]:
|
||||
return {
|
||||
**await super().get_feature_flags(),
|
||||
**{
|
||||
"mic": True,
|
||||
"smartDetect": [
|
||||
"person",
|
||||
"vehicle",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def label_to_object_type(cls, label: str) -> Optional[SmartDetectObjectType]:
|
||||
if label == "person":
|
||||
return SmartDetectObjectType.PERSON
|
||||
elif label in {"vehicle", "car", "motorcycle", "bus"}:
|
||||
return SmartDetectObjectType.VEHICLE
|
||||
|
||||
async def run(self) -> None:
|
||||
has_connected = False
|
||||
|
||||
@backoff.on_predicate(backoff.expo, max_value=60, logger=self.logger)
|
||||
async def mqtt_connect():
|
||||
nonlocal has_connected
|
||||
try:
|
||||
async with Client(
|
||||
self.args.mqtt_host,
|
||||
port=self.args.mqtt_port,
|
||||
username=self.args.mqtt_username,
|
||||
password=self.args.mqtt_password,
|
||||
) as client:
|
||||
has_connected = True
|
||||
self.logger.info(
|
||||
f"Connected to {self.args.mqtt_host}:{self.args.mqtt_port}"
|
||||
)
|
||||
tasks = [
|
||||
self.handle_detection_events(client),
|
||||
self.handle_snapshot_events(client),
|
||||
]
|
||||
await client.subscribe(f"{self.args.mqtt_prefix}/#")
|
||||
await asyncio.gather(*tasks)
|
||||
except MqttError:
|
||||
if not has_connected:
|
||||
raise
|
||||
|
||||
await mqtt_connect()
|
||||
|
||||
async def handle_detection_events(self, client) -> None:
|
||||
async with client.filtered_messages(
|
||||
f"{self.args.mqtt_prefix}/events"
|
||||
) as messages:
|
||||
async for message in messages:
|
||||
msg = message.payload.decode()
|
||||
try:
|
||||
frigate_msg = json.loads(message.payload.decode())
|
||||
if not frigate_msg["after"]["camera"] == self.args.frigate_camera:
|
||||
continue
|
||||
|
||||
label = frigate_msg["after"]["label"]
|
||||
object_type = self.label_to_object_type(label)
|
||||
if not object_type:
|
||||
self.logger.warning(
|
||||
f"Received unsupported detection label type: {label}"
|
||||
)
|
||||
|
||||
if not self.event_id and frigate_msg["type"] == "new":
|
||||
self.event_id = frigate_msg["after"]["id"]
|
||||
self.event_label = label
|
||||
self.event_snapshot_ready = asyncio.Event()
|
||||
self.logger.info(
|
||||
f"Starting {self.event_label} motion event"
|
||||
f" (id: {self.event_id})"
|
||||
)
|
||||
await self.trigger_motion_start(object_type)
|
||||
elif (
|
||||
self.event_id == frigate_msg["after"]["id"]
|
||||
and frigate_msg["type"] == "end"
|
||||
):
|
||||
# Wait for the best snapshot to be ready before
|
||||
# ending the motion event
|
||||
self.logger.info(f"Awaiting snapshot (id: {self.event_id})")
|
||||
await self.event_snapshot_ready.wait()
|
||||
self.logger.info(
|
||||
f"Ending {self.event_label} motion event"
|
||||
f" (id: {self.event_id})"
|
||||
)
|
||||
await self.trigger_motion_stop()
|
||||
self.event_id = None
|
||||
self.event_label = None
|
||||
except json.JSONDecodeError:
|
||||
self.logger.exception(f"Could not decode payload: {msg}")
|
||||
|
||||
async def handle_snapshot_events(self, client) -> None:
|
||||
topic_fmt = f"{self.args.mqtt_prefix}/{self.args.frigate_camera}/{{}}/snapshot"
|
||||
async with client.filtered_messages(topic_fmt.format("+")) as messages:
|
||||
async for message in messages:
|
||||
if (
|
||||
self.event_id
|
||||
and not message.retain
|
||||
and message.topic == topic_fmt.format(self.event_label)
|
||||
):
|
||||
f = tempfile.NamedTemporaryFile()
|
||||
f.write(message.payload)
|
||||
self.logger.debug(
|
||||
f"Updating snapshot for {self.event_label} with {f.name}"
|
||||
)
|
||||
self.update_motion_snapshot(Path(f.name))
|
||||
self.event_snapshot_ready.set()
|
||||
else:
|
||||
self.logger.debug(
|
||||
f"Discarding snapshot message ({len(message.payload)})"
|
||||
)
|
Reference in New Issue
Block a user