EdgeAI_Digit_Recognition/manager.py

113 lines
4.1 KiB
Python

import json
import logging
import threading
import time
from config import Config
from camera import VideoStream
logger = logging.getLogger(__name__)
class CameraManager:
def __init__(self):
# State containers
# Replaces global 'cameras', 'rois', 'camera_results'
self.cameras = {}
self.rois = {}
self.results = {}
# Load initial configuration immediately
self.load_camera_config()
self.load_roi_config()
def load_camera_config(self):
"""Load settings from JSON and initialize VideoStream objects."""
try:
with open(Config.CAMERA_CONFIG_PATH, 'r') as f:
config_data = json.load(f)
for cam_conf in config_data:
cam_id = cam_conf['id']
if not cam_conf.get("active", False):
continue
# Initialize stream only if new
if cam_id not in self.cameras:
logger.info(f"Initializing new camera manager entry: {cam_id}")
stream = VideoStream(
camera_id=cam_id,
rtsp_url=cam_conf['rtsp_url'],
flip_type=cam_conf.get("flip_type", "None")
)
stream.start()
self.cameras[cam_id] = {
"id": cam_id,
"name": cam_conf['name'],
"stream": stream,
"width": stream.width or 640,
"height": stream.height or 360,
"active": True,
"rtsp_url": cam_conf['rtsp_url'],
"flip_type": cam_conf.get("flip_type", "None")
}
# Ensure ROI list exists
if cam_id not in self.rois:
self.rois[cam_id] = []
except Exception as e:
logger.error(f"Failed to load camera config: {e}")
def load_roi_config(self):
"""Reload ROI definitions from disk."""
try:
with open(Config.ROI_CONFIG_PATH, 'r') as f:
self.rois = json.load(f)
# logger.info(f"ROIs loaded/reloaded.") # Optional logging
except Exception as e:
logger.error(f"Error loading ROI config: {e}")
self.rois = {}
def save_roi_config(self):
"""Save current ROIs to disk."""
try:
with open(Config.ROI_CONFIG_PATH, 'w') as f:
json.dump(self.rois, f, indent=4)
return {"success": True}
except Exception as e:
logger.error(f"Error saving ROI config: {e}")
return {"success": False, "error": str(e)}
def get_frame(self, camera_id):
"""Helper to get the latest frame from a specific camera."""
cam = self.cameras.get(camera_id)
if not cam or not cam.get('stream'):
return None
return cam['stream'].read()
def get_camera_list(self):
"""Return simple list of cameras for API."""
return [{"id": c['id'], "name": c['name']} for c in self.cameras.values()]
def update_camera_flip(self, camera_id, flip_type):
"""Update flip settings for a camera."""
if camera_id in self.cameras:
self.cameras[camera_id]["flip_type"] = flip_type
stream = self.cameras[camera_id].get("stream")
if stream:
stream.set_flip(flip_type)
# Persist to disk
try:
with open(Config.CAMERA_CONFIG_PATH, 'r') as f:
config = json.load(f)
for c in config:
if c['id'] == camera_id:
c['flip_type'] = flip_type
with open(Config.CAMERA_CONFIG_PATH, 'w') as f:
json.dump(config, f, indent=4)
return True
except Exception as e:
logger.error(f"Failed to save flip config: {e}")
return False