508 lines
17 KiB
Python
508 lines
17 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import paho.mqtt.client as mqtt
|
|
from flask import Flask, Response, jsonify, render_template, request
|
|
|
|
from config import Config
|
|
from inference import InferenceWorker
|
|
from manager import CameraManager
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# 1. USER CONFIGURATION (Edit these values here)
|
|
# ------------------------------------------------------------------------------
|
|
|
|
# Enable verbose debug logs (True = verbose, False = quiet/crucial only)
|
|
DEBUG_LOG = False
|
|
|
|
# Rate Limiting: How many seconds to wait between detections per camera
|
|
DETECTION_INTERVAL = 10
|
|
|
|
# Frame Quality Threshold: Skip images with standard deviation lower than this.
|
|
# - Low values (1-5) allow darker/low-contrast images (good for night).
|
|
# - High values (20-40) filter out gray/blank screens but might skip valid dark images.
|
|
# - Set to 0 to disable this check entirely.
|
|
FRAME_STD_THRESHOLD = 5.0
|
|
|
|
# ------------------------------------------------------------------------------
|
|
|
|
|
|
def _cfg(*names, default=None):
|
|
"""Return first matching attribute from Config, else default."""
|
|
for n in names:
|
|
if hasattr(Config, n):
|
|
return getattr(Config, n)
|
|
return default
|
|
|
|
|
|
# --- Logging setup ---
|
|
LOG_LEVEL = logging.DEBUG if DEBUG_LOG else _cfg("LOG_LEVEL", "LOGLEVEL", default=logging.INFO)
|
|
logging.basicConfig(
|
|
level=LOG_LEVEL,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
|
|
# --- Initialize components ---
|
|
camera_manager = CameraManager()
|
|
inference_worker = InferenceWorker(debug_log=DEBUG_LOG)
|
|
inference_worker.start()
|
|
|
|
# --- MQTT setup ---
|
|
mqtt_client = mqtt.Client()
|
|
|
|
MQTT_USERNAME = _cfg("MQTT_USERNAME", "MQTTUSERNAME", default=None)
|
|
MQTT_PASSWORD = _cfg("MQTT_PASSWORD", "MQTTPASSWORD", default=None)
|
|
MQTT_BROKER = _cfg("MQTT_BROKER", "MQTTBROKER", default="127.0.0.1")
|
|
MQTT_PORT = int(_cfg("MQTT_PORT", "MQTTPORT", default=1883))
|
|
MQTT_TOPIC = _cfg("MQTT_TOPIC", "MQTTTOPIC", default="homeassistant/sensor/RTSPCamDigitDetection/state")
|
|
|
|
if MQTT_USERNAME and MQTT_PASSWORD:
|
|
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
|
|
|
try:
|
|
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
|
mqtt_client.loop_start()
|
|
logger.info("Connected to MQTT Broker at %s:%s", MQTT_BROKER, MQTT_PORT)
|
|
except Exception as e:
|
|
logger.error("Failed to connect to MQTT Broker: %s", e)
|
|
|
|
|
|
# --- Helpers ---
|
|
_last_log = {}
|
|
|
|
def log_rl(level, key, msg, every_s=10):
|
|
"""Rate-limited log. Use for noisy conditions."""
|
|
now = time.time()
|
|
last = _last_log.get(key, 0.0)
|
|
if now - last >= every_s:
|
|
_last_log[key] = now
|
|
logger.log(level, msg)
|
|
|
|
|
|
def log_debug(key, msg, every_s=0):
|
|
"""Debug-only logging with optional rate limiting."""
|
|
if not DEBUG_LOG:
|
|
return
|
|
if every_s and every_s > 0:
|
|
log_rl(logging.DEBUG, key, msg, every_s=every_s)
|
|
else:
|
|
logger.debug(msg)
|
|
|
|
|
|
def log_condition(camera_id: str, cond_key: str, msg: str, *, crucial=False,
|
|
debug_level=logging.DEBUG, debug_every=5,
|
|
nodebug_level=logging.WARNING, nodebug_every=60):
|
|
"""Log conditions (skip reasons, degraded state) without spamming.
|
|
|
|
- If DEBUG_LOG=True -> frequent detailed logs.
|
|
- If DEBUG_LOG=False -> only rate-limited warnings for crucial conditions.
|
|
"""
|
|
key = f"{camera_id}:{cond_key}"
|
|
if DEBUG_LOG:
|
|
log_rl(debug_level, key, msg, every_s=debug_every)
|
|
return
|
|
|
|
if crucial:
|
|
log_rl(nodebug_level, key, msg, every_s=nodebug_every)
|
|
|
|
|
|
def crop_image_for_ui(image, roi_list, scaleX, scaleY):
|
|
"""Helper for the /crop endpoint (UI preview only)."""
|
|
cropped_images = []
|
|
for roi in roi_list:
|
|
try:
|
|
x = int(roi['x'] * scaleX)
|
|
y = int(roi['y'] * scaleY)
|
|
width = int(roi['width'] * scaleX)
|
|
height = int(roi['height'] * scaleY)
|
|
cropped = image[y:y + height, x:x + width]
|
|
if cropped.size > 0:
|
|
cropped_images.append(cropped)
|
|
except Exception:
|
|
pass
|
|
return cropped_images
|
|
|
|
|
|
def publish_detected_number(camera_id, detected_number, confidence=None):
|
|
"""Publish result to MQTT with optional confidence score."""
|
|
topic = f"{MQTT_TOPIC}/{camera_id}"
|
|
|
|
payload_dict = {"value": detected_number}
|
|
if confidence is not None:
|
|
payload_dict["confidence"] = round(float(confidence), 2)
|
|
|
|
payload = json.dumps(payload_dict)
|
|
|
|
try:
|
|
mqtt_client.publish(topic, payload)
|
|
# Keep this INFO even when debug is off: it's the primary business output.
|
|
log_msg = f"Published to {topic}: {detected_number}"
|
|
if confidence is not None:
|
|
log_msg += f" (Conf: {confidence:.2f})"
|
|
logger.info(log_msg)
|
|
except Exception as e:
|
|
logger.error("MQTT Publish failed: %s", e)
|
|
|
|
|
|
# --- Main processing loop ---
|
|
last_processed_time = {}
|
|
|
|
def process_all_cameras():
|
|
hb_last = 0.0
|
|
|
|
while True:
|
|
try:
|
|
# Heartbeat only in debug mode
|
|
if DEBUG_LOG:
|
|
now = time.time()
|
|
if now - hb_last >= 5.0:
|
|
hb_last = now
|
|
in_q = getattr(inference_worker, "input_queue", None)
|
|
out_q = getattr(inference_worker, "result_queue", None)
|
|
logger.info(
|
|
"HB mainloop alive; in_q=%s out_q=%s dropped=%s processed=%s last_invoke_s=%s",
|
|
(in_q.qsize() if in_q else "n/a"),
|
|
(out_q.qsize() if out_q else "n/a"),
|
|
getattr(inference_worker, "dropped_tasks", "n/a"),
|
|
getattr(inference_worker, "processed_tasks", "n/a"),
|
|
getattr(inference_worker, "last_invoke_secs", "n/a"),
|
|
)
|
|
|
|
# --- Part 1: process results ---
|
|
while True:
|
|
result = inference_worker.get_result()
|
|
if not result:
|
|
break
|
|
|
|
cam_id = result.get('camera_id')
|
|
|
|
# Debug-only latency trace
|
|
if DEBUG_LOG and result.get("task_ts") is not None:
|
|
try:
|
|
age = time.time() - float(result["task_ts"])
|
|
logger.info(
|
|
"Result cam=%s type=%s task_id=%s age_s=%.3f timing=%s",
|
|
cam_id,
|
|
result.get("type"),
|
|
result.get("task_id"),
|
|
age,
|
|
result.get("timing_s"),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if result.get('type') == 'success':
|
|
val = result['value']
|
|
conf = result.get('confidence')
|
|
camera_manager.results[cam_id] = val
|
|
publish_detected_number(cam_id, val, conf)
|
|
|
|
elif result.get('type') == 'error':
|
|
msg = result.get('message', 'Unknown error')
|
|
|
|
# When debug is off, avoid spamming "Low confidence" messages.
|
|
if DEBUG_LOG:
|
|
logger.warning("[%s] Detection skipped: %s", cam_id, msg)
|
|
else:
|
|
# Crucial errors: rate-limited warnings.
|
|
# Filter out "Low confidence" unless it's crucial for you.
|
|
if not str(msg).lower().startswith("low confidence"):
|
|
log_condition(cam_id, "detect_error", f"[{cam_id}] Detection skipped: {msg}", crucial=True)
|
|
|
|
# --- Part 2: feed frames ---
|
|
camera_manager.load_roi_config()
|
|
|
|
for camera_id, camera_data in camera_manager.cameras.items():
|
|
if not camera_data.get("active", True):
|
|
continue
|
|
|
|
current_time = time.time()
|
|
last_time = last_processed_time.get(camera_id, 0.0)
|
|
|
|
if current_time - last_time < DETECTION_INTERVAL:
|
|
log_debug(f"{camera_id}:rate", f"[{camera_id}] skip: rate limit", every_s=30)
|
|
continue
|
|
|
|
stream = camera_data.get("stream")
|
|
if not stream:
|
|
log_condition(camera_id, "nostream", f"[{camera_id}] skip: no stream", crucial=True)
|
|
continue
|
|
|
|
start_time = getattr(stream, "start_time", getattr(stream, "starttime", None))
|
|
if start_time is not None and (current_time - start_time) < 5:
|
|
log_debug(f"{camera_id}:warmup", f"[{camera_id}] skip: warmup", every_s=10)
|
|
continue
|
|
|
|
frame = stream.read()
|
|
if frame is None:
|
|
log_condition(camera_id, "noframe", f"[{camera_id}] skip: frame is None", crucial=True)
|
|
continue
|
|
|
|
# STD Check
|
|
frame_std = float(np.std(frame))
|
|
if frame_std < FRAME_STD_THRESHOLD:
|
|
log_condition(
|
|
camera_id,
|
|
"lowstd",
|
|
f"[{camera_id}] skip: low frame std={frame_std:.2f} (<{FRAME_STD_THRESHOLD})",
|
|
crucial=True,
|
|
debug_every=5,
|
|
nodebug_every=60,
|
|
)
|
|
continue
|
|
|
|
roi_list = camera_manager.rois.get(camera_id, [])
|
|
if not roi_list:
|
|
log_condition(camera_id, "norois", f"[{camera_id}] skip: no ROIs configured", crucial=True)
|
|
continue
|
|
|
|
inference_worker.add_task(camera_id, roi_list, frame, frame_std=frame_std)
|
|
last_processed_time[camera_id] = current_time
|
|
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
logger.error("Global process loop error: %s", e)
|
|
traceback.print_exc()
|
|
time.sleep(5)
|
|
|
|
|
|
# --- Flask routes ---
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/cameras', methods=['GET'])
|
|
def get_cameras():
|
|
return jsonify(camera_manager.get_camera_list())
|
|
|
|
|
|
@app.route('/video/<camera_id>')
|
|
def video_feed(camera_id):
|
|
def generate():
|
|
while True:
|
|
frame = camera_manager.get_frame(camera_id)
|
|
if frame is not None:
|
|
ret, jpeg = cv2.imencode('.jpg', frame)
|
|
if ret:
|
|
yield (
|
|
b'--frame\r\n'
|
|
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n'
|
|
)
|
|
else:
|
|
time.sleep(0.1)
|
|
|
|
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
|
|
|
@app.route('/snapshot/<camera_id>')
|
|
def snapshot(camera_id):
|
|
frame = camera_manager.get_frame(camera_id)
|
|
if frame is not None:
|
|
ret, jpeg = cv2.imencode('.jpg', frame)
|
|
if ret:
|
|
return Response(jpeg.tobytes(), mimetype='image/jpeg')
|
|
return 'No frame available', 404
|
|
|
|
|
|
@app.route('/rois/<camera_id>', methods=['GET'])
|
|
def get_rois(camera_id):
|
|
try:
|
|
camera_manager.load_roi_config()
|
|
all_rois = camera_manager.rois
|
|
|
|
img_width = request.args.get("img_width", type=float)
|
|
img_height = request.args.get("img_height", type=float)
|
|
|
|
if not img_width or not img_height:
|
|
return jsonify(all_rois.get(camera_id, []))
|
|
|
|
cam = camera_manager.cameras.get(camera_id)
|
|
if cam and cam.get("stream"):
|
|
real_w = cam["stream"].width or cam["width"]
|
|
real_h = cam["stream"].height or cam["height"]
|
|
else:
|
|
return jsonify({"error": "Camera not ready"}), 500
|
|
|
|
scaleX = img_width / real_w
|
|
scaleY = img_height / real_h
|
|
|
|
scaled_rois = []
|
|
for roi in all_rois.get(camera_id, []):
|
|
scaled_rois.append({
|
|
"id": roi["id"],
|
|
"x": int(round(roi["x"] * scaleX)),
|
|
"y": int(round(roi["y"] * scaleY)),
|
|
"width": int(round(roi["width"] * scaleX)),
|
|
"height": int(round(roi["height"] * scaleY)),
|
|
"angle": roi.get("angle", 0),
|
|
})
|
|
|
|
return jsonify(scaled_rois)
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/save_rois", methods=["POST"])
|
|
def save_rois_api():
|
|
data = request.json
|
|
|
|
camera_id = data.get("camera_id")
|
|
new_rois = data.get("rois")
|
|
img_width = data.get("img_width")
|
|
img_height = data.get("img_height")
|
|
|
|
if not camera_id or new_rois is None:
|
|
return jsonify({"success": False})
|
|
|
|
cam = camera_manager.cameras.get(camera_id)
|
|
if not cam:
|
|
return jsonify({"success": False})
|
|
|
|
stream = cam.get("stream")
|
|
real_w = stream.width if stream and getattr(stream, "width", None) else cam["width"]
|
|
real_h = stream.height if stream and getattr(stream, "height", None) else cam["height"]
|
|
|
|
scaleX = real_w / img_width if img_width else 1
|
|
scaleY = real_h / img_height if img_height else 1
|
|
|
|
scaled_rois = []
|
|
for roi in new_rois:
|
|
scaled_rois.append({
|
|
"id": roi["id"],
|
|
"x": int(round(roi["x"] * scaleX)),
|
|
"y": int(round(roi["y"] * scaleY)),
|
|
"width": int(round(roi["width"] * scaleX)),
|
|
"height": int(round(roi["height"] * scaleY)),
|
|
"angle": roi.get("angle", 0),
|
|
})
|
|
|
|
camera_manager.rois[camera_id] = scaled_rois
|
|
return jsonify(camera_manager.save_roi_config())
|
|
|
|
|
|
@app.route('/crop', methods=['POST'])
|
|
def crop():
|
|
data = request.json
|
|
|
|
camera_id = data.get('camera_id')
|
|
scaleX = data.get('scaleX', 1)
|
|
scaleY = data.get('scaleY', 1)
|
|
|
|
frame = camera_manager.get_frame(camera_id)
|
|
if frame is None:
|
|
return jsonify({'error': 'No frame'}), 500
|
|
|
|
roi_list = camera_manager.rois.get(camera_id, [])
|
|
cropped_images = crop_image_for_ui(frame, roi_list, scaleX, scaleY)
|
|
|
|
cropped_base64_list = []
|
|
for cropped_img in cropped_images:
|
|
ret, buffer = cv2.imencode('.jpg', cropped_img)
|
|
if ret:
|
|
cropped_base64_list.append(base64.b64encode(buffer).decode('utf-8'))
|
|
|
|
return jsonify({'cropped_images': cropped_base64_list})
|
|
|
|
|
|
@app.route('/detect_digits', methods=['POST'])
|
|
def detect_digits():
|
|
"""Manual trigger: Runs inference immediately and returns result with validation."""
|
|
data = request.json
|
|
|
|
camera_id = data.get('camera_id')
|
|
if not camera_id:
|
|
return jsonify({'error': 'Invalid camera ID'}), 400
|
|
|
|
frame = camera_manager.get_frame(camera_id)
|
|
if frame is None:
|
|
return jsonify({'error': 'Failed to capture image'}), 500
|
|
|
|
roi_list = camera_manager.rois.get(camera_id, [])
|
|
if not roi_list:
|
|
return jsonify({'error': 'No ROIs defined'}), 400
|
|
|
|
cropped_images = crop_image_for_ui(frame, roi_list, scaleX=1, scaleY=1)
|
|
if not cropped_images:
|
|
return jsonify({'error': 'Failed to crop ROIs'}), 500
|
|
|
|
try:
|
|
predictions = inference_worker.predict_batch(cropped_images)
|
|
|
|
valid_digits_str = []
|
|
confidences = []
|
|
rejected_reasons = []
|
|
|
|
CONFIDENCE_THRESHOLD = inference_worker.CONFIDENCE_THRESHOLD
|
|
MIN_VALUE = inference_worker.MIN_VALUE
|
|
MAX_VALUE = inference_worker.MAX_VALUE
|
|
|
|
for i, p in enumerate(predictions):
|
|
if p['confidence'] < CONFIDENCE_THRESHOLD:
|
|
msg = f"Digit {i} ('{p['digit']}') rejected: conf {p['confidence']:.2f} < {CONFIDENCE_THRESHOLD}"
|
|
rejected_reasons.append(msg)
|
|
if DEBUG_LOG:
|
|
logger.warning("[Manual] %s", msg)
|
|
else:
|
|
valid_digits_str.append(p['digit'])
|
|
confidences.append(p['confidence'])
|
|
|
|
if len(valid_digits_str) != len(predictions):
|
|
return jsonify({'error': 'Low confidence detection', 'details': rejected_reasons, 'raw': predictions}), 400
|
|
|
|
final_number_str = "".join(valid_digits_str)
|
|
try:
|
|
final_number = int(final_number_str)
|
|
|
|
if not (MIN_VALUE <= final_number <= MAX_VALUE):
|
|
msg = f"Value {final_number} out of range ({MIN_VALUE}-{MAX_VALUE})"
|
|
if DEBUG_LOG:
|
|
logger.warning("[Manual] %s", msg)
|
|
return jsonify({'error': 'Value out of range', 'value': final_number}), 400
|
|
|
|
avg_conf = float(np.mean(confidences)) if confidences else None
|
|
publish_detected_number(camera_id, final_number, avg_conf)
|
|
camera_manager.results[camera_id] = final_number
|
|
|
|
return jsonify({
|
|
'detected_digits': valid_digits_str,
|
|
'final_number': final_number,
|
|
'confidences': confidences,
|
|
'avg_confidence': avg_conf,
|
|
})
|
|
|
|
except ValueError:
|
|
return jsonify({'error': 'Could not parse digits', 'raw': valid_digits_str}), 500
|
|
|
|
except Exception as e:
|
|
logger.error("Error during manual detection: %s", e)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/update_camera_config', methods=['POST'])
|
|
def update_camera_config():
|
|
data = request.json
|
|
success = camera_manager.update_camera_flip(data.get("camera_id"), data.get("flip_type"))
|
|
return jsonify({"success": success})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
t = threading.Thread(target=process_all_cameras, daemon=True)
|
|
t.start()
|
|
|
|
logger.info("Starting Flask Server...")
|
|
app.run(host='0.0.0.0', port=5000, threaded=True)
|