Compare commits

..

No commits in common. "6efcad9cdf00229d1e96afa425f95407742b9ee6" and "d4ef9c265455af1abcd0a182288946a06e83bff8" have entirely different histories.

4 changed files with 199 additions and 579 deletions

410
app.py
View File

@ -1,122 +1,51 @@
import base64
import json
import logging import logging
import sys import sys
import os
import threading import threading
import json
import time import time
import traceback import traceback
import base64
import cv2 import cv2
import numpy as np import numpy as np
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from flask import Flask, Response, jsonify, render_template, request from flask import Flask, render_template, jsonify, request, Response
# test
# Import Config, Manager, and NEW Inference Worker
from config import Config from config import Config
from inference import InferenceWorker
from manager import CameraManager from manager import CameraManager
from inference import InferenceWorker
# ------------------------------------------------------------------------------ # --- Logging Setup ---
# 1. USER CONFIGURATION (Edit these values here)
# ------------------------------------------------------------------------------
# Enable verbose debug logs (True = verbose, False = quiet/crucial only)
DEBUG_LOG = False
# Rate Limiting: How many seconds to wait between detections per camera
DETECTION_INTERVAL = 10
# Frame Quality Threshold: Skip images with standard deviation lower than this.
# - Low values (1-5) allow darker/low-contrast images (good for night).
# - High values (20-40) filter out gray/blank screens but might skip valid dark images.
# - Set to 0 to disable this check entirely.
FRAME_STD_THRESHOLD = 1.0
# ------------------------------------------------------------------------------
def _cfg(*names, default=None):
"""Return first matching attribute from Config, else default."""
for n in names:
if hasattr(Config, n):
return getattr(Config, n)
return default
# --- Logging setup ---
LOG_LEVEL = logging.DEBUG if DEBUG_LOG else _cfg("LOG_LEVEL", "LOGLEVEL", default=logging.INFO)
logging.basicConfig( logging.basicConfig(
level=LOG_LEVEL, level=Config.LOG_LEVEL,
format='%(asctime)s [%(levelname)s] %(message)s', format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)], handlers=[logging.StreamHandler(sys.stdout)]
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = Flask(__name__) app = Flask(__name__)
# --- Initialize components --- # --- Initialize Components ---
camera_manager = CameraManager() camera_manager = CameraManager()
inference_worker = InferenceWorker(debug_log=DEBUG_LOG) inference_worker = InferenceWorker() # <--- NEW
inference_worker.start() inference_worker.start() # <--- Start the background thread
# --- MQTT setup --- # --- MQTT Setup ---
mqtt_client = mqtt.Client() mqtt_client = mqtt.Client()
if Config.MQTT_USERNAME and Config.MQTT_PASSWORD:
MQTT_USERNAME = _cfg("MQTT_USERNAME", "MQTTUSERNAME", default=None) mqtt_client.username_pw_set(Config.MQTT_USERNAME, Config.MQTT_PASSWORD)
MQTT_PASSWORD = _cfg("MQTT_PASSWORD", "MQTTPASSWORD", default=None)
MQTT_BROKER = _cfg("MQTT_BROKER", "MQTTBROKER", default="127.0.0.1")
MQTT_PORT = int(_cfg("MQTT_PORT", "MQTTPORT", default=1883))
MQTT_TOPIC = _cfg("MQTT_TOPIC", "MQTTTOPIC", default="homeassistant/sensor/RTSPCamDigitDetection/state")
if MQTT_USERNAME and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try: try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, 60)
mqtt_client.loop_start() mqtt_client.loop_start() # START THE LOOP HERE
logger.info("Connected to MQTT Broker at %s:%s", MQTT_BROKER, MQTT_PORT) logger.info(f"Connected to MQTT Broker at {Config.MQTT_BROKER}:{Config.MQTT_PORT}")
except Exception as e: except Exception as e:
logger.error("Failed to connect to MQTT Broker: %s", e) logger.error(f"Failed to connect to MQTT Broker: {e}")
# --- Helpers ---
_last_log = {}
def log_rl(level, key, msg, every_s=10):
"""Rate-limited log. Use for noisy conditions."""
now = time.time()
last = _last_log.get(key, 0.0)
if now - last >= every_s:
_last_log[key] = now
logger.log(level, msg)
def log_debug(key, msg, every_s=0):
"""Debug-only logging with optional rate limiting."""
if not DEBUG_LOG:
return
if every_s and every_s > 0:
log_rl(logging.DEBUG, key, msg, every_s=every_s)
else:
logger.debug(msg)
def log_condition(camera_id: str, cond_key: str, msg: str, *, crucial=False,
debug_level=logging.DEBUG, debug_every=5,
nodebug_level=logging.WARNING, nodebug_every=60):
"""Log conditions (skip reasons, degraded state) without spamming.
- If DEBUG_LOG=True -> frequent detailed logs.
- If DEBUG_LOG=False -> only rate-limited warnings for crucial conditions.
"""
key = f"{camera_id}:{cond_key}"
if DEBUG_LOG:
log_rl(debug_level, key, msg, every_s=debug_every)
return
if crucial:
log_rl(nodebug_level, key, msg, every_s=nodebug_every)
# --- Helper Functions (UI Only) ---
def crop_image_for_ui(image, roi_list, scaleX, scaleY): def crop_image_for_ui(image, roi_list, scaleX, scaleY):
"""Helper for the /crop endpoint (UI preview only).""" """Helper for the /crop endpoint (UI preview only)."""
cropped_images = [] cropped_images = []
@ -133,162 +62,100 @@ def crop_image_for_ui(image, roi_list, scaleX, scaleY):
pass pass
return cropped_images return cropped_images
def publish_detected_number(camera_id, detected_number):
def publish_detected_number(camera_id, detected_number, confidence=None): """Publish result to MQTT."""
"""Publish result to MQTT with optional confidence score.""" topic = f"{Config.MQTT_TOPIC}/{camera_id}"
topic = f"{MQTT_TOPIC}/{camera_id}" payload = json.dumps({"value": detected_number})
payload_dict = {"value": detected_number}
if confidence is not None:
payload_dict["confidence"] = round(float(confidence), 2)
payload = json.dumps(payload_dict)
try: try:
mqtt_client.publish(topic, payload) mqtt_client.publish(topic, payload)
# Keep this INFO even when debug is off: it's the primary business output. logger.info(f"Published to {topic}: {detected_number}")
log_msg = f"Published to {topic}: {detected_number}"
if confidence is not None:
log_msg += f" (Conf: {confidence:.2f})"
logger.info(log_msg)
except Exception as e: except Exception as e:
logger.error("MQTT Publish failed: %s", e) logger.error(f"MQTT Publish failed: {e}")
# --- Main Processing Loop (Refactored) ---
# Add this global dictionary at the top of app.py (near other globals)
last_processed_time = {}
# --- Main processing loop --- # Update process_all_cameras function
last_processed_time = {}
def process_all_cameras(): def process_all_cameras():
hb_last = 0.0 """
Revised Loop with Rate Limiting
"""
# Configurable interval (seconds)
DETECTION_INTERVAL = 10
while True: while True:
try: try:
# Heartbeat only in debug mode # --- Part 1: Process Results ---
if DEBUG_LOG:
now = time.time()
if now - hb_last >= 5.0:
hb_last = now
in_q = getattr(inference_worker, "input_queue", None)
out_q = getattr(inference_worker, "result_queue", None)
logger.info(
"HB mainloop alive; in_q=%s out_q=%s dropped=%s processed=%s last_invoke_s=%s",
(in_q.qsize() if in_q else "n/a"),
(out_q.qsize() if out_q else "n/a"),
getattr(inference_worker, "dropped_tasks", "n/a"),
getattr(inference_worker, "processed_tasks", "n/a"),
getattr(inference_worker, "last_invoke_secs", "n/a"),
)
# --- Part 1: process results ---
while True: while True:
result = inference_worker.get_result() result = inference_worker.get_result()
if not result: if not result:
break break
cam_id = result.get('camera_id') cam_id = result['camera_id']
val = result['value']
# Debug-only latency trace
if DEBUG_LOG and result.get("task_ts") is not None: camera_manager.results[cam_id] = val
try: publish_detected_number(cam_id, val)
age = time.time() - float(result["task_ts"])
logger.info( # --- Part 2: Feed Frames ---
"Result cam=%s type=%s task_id=%s age_s=%.3f timing=%s",
cam_id,
result.get("type"),
result.get("task_id"),
age,
result.get("timing_s"),
)
except Exception:
pass
if result.get('type') == 'success':
val = result['value']
conf = result.get('confidence')
camera_manager.results[cam_id] = val
publish_detected_number(cam_id, val, conf)
elif result.get('type') == 'error':
msg = result.get('message', 'Unknown error')
# When debug is off, avoid spamming "Low confidence" messages.
if DEBUG_LOG:
logger.warning("[%s] Detection skipped: %s", cam_id, msg)
else:
# Crucial errors: rate-limited warnings.
# Filter out "Low confidence" unless it's crucial for you.
if not str(msg).lower().startswith("low confidence"):
log_condition(cam_id, "detect_error", f"[{cam_id}] Detection skipped: {msg}", crucial=True)
# --- Part 2: feed frames ---
camera_manager.load_roi_config() camera_manager.load_roi_config()
for camera_id, camera_data in camera_manager.cameras.items(): for camera_id, camera_data in camera_manager.cameras.items():
if not camera_data.get("active", True): if not camera_data.get("active", True):
continue continue
# RATE LIMIT CHECK
current_time = time.time() current_time = time.time()
last_time = last_processed_time.get(camera_id, 0.0) last_time = last_processed_time.get(camera_id, 0)
if current_time - last_time < DETECTION_INTERVAL: if current_time - last_time < DETECTION_INTERVAL:
log_debug(f"{camera_id}:rate", f"[{camera_id}] skip: rate limit", every_s=30) continue # Skip this camera, it's too soon
continue
stream = camera_data.get("stream") stream = camera_data.get("stream")
if not stream: if not stream: continue
log_condition(camera_id, "nostream", f"[{camera_id}] skip: no stream", crucial=True)
continue
start_time = getattr(stream, "start_time", getattr(stream, "starttime", None)) # Warmup Check
if start_time is not None and (current_time - start_time) < 5: if (current_time - stream.start_time) < 5:
log_debug(f"{camera_id}:warmup", f"[{camera_id}] skip: warmup", every_s=10)
continue continue
frame = stream.read() frame = stream.read()
if frame is None: if frame is None:
log_condition(camera_id, "noframe", f"[{camera_id}] skip: frame is None", crucial=True)
continue continue
# STD Check if np.std(frame) < 10:
frame_std = float(np.std(frame))
if frame_std < FRAME_STD_THRESHOLD:
log_condition(
camera_id,
"lowstd",
f"[{camera_id}] skip: low frame std={frame_std:.2f} (<{FRAME_STD_THRESHOLD})",
crucial=True,
debug_every=5,
nodebug_every=60,
)
continue continue
roi_list = camera_manager.rois.get(camera_id, []) roi_list = camera_manager.rois.get(camera_id, [])
if not roi_list: if not roi_list:
log_condition(camera_id, "norois", f"[{camera_id}] skip: no ROIs configured", crucial=True)
continue continue
inference_worker.add_task(camera_id, roi_list, frame, frame_std=frame_std) # SEND TO WORKER
inference_worker.add_task(camera_id, roi_list, frame)
# Update last processed time
last_processed_time[camera_id] = current_time last_processed_time[camera_id] = current_time
time.sleep(0.1) # Sleep briefly to prevent CPU spinning, but keep it responsive for results
time.sleep(0.1)
except Exception as e: except Exception as e:
logger.error("Global process loop error: %s", e) logger.error(f"Global process loop error: {e}")
traceback.print_exc() traceback.print_exc()
time.sleep(5) time.sleep(5)
# --- Flask routes --- # --- Flask Routes (Unchanged logic, just imports) ---
@app.route('/') @app.route('/')
def index(): def index():
return render_template('index.html') return render_template('index.html')
@app.route('/cameras', methods=['GET']) @app.route('/cameras', methods=['GET'])
def get_cameras(): def get_cameras():
return jsonify(camera_manager.get_camera_list()) return jsonify(camera_manager.get_camera_list())
@app.route('/video/<camera_id>') @app.route('/video/<camera_id>')
def video_feed(camera_id): def video_feed(camera_id):
def generate(): def generate():
@ -297,16 +164,11 @@ def video_feed(camera_id):
if frame is not None: if frame is not None:
ret, jpeg = cv2.imencode('.jpg', frame) ret, jpeg = cv2.imencode('.jpg', frame)
if ret: if ret:
yield ( yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n'
)
else: else:
time.sleep(0.1) time.sleep(0.1)
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/snapshot/<camera_id>') @app.route('/snapshot/<camera_id>')
def snapshot(camera_id): def snapshot(camera_id):
frame = camera_manager.get_frame(camera_id) frame = camera_manager.get_frame(camera_id)
@ -316,29 +178,27 @@ def snapshot(camera_id):
return Response(jpeg.tobytes(), mimetype='image/jpeg') return Response(jpeg.tobytes(), mimetype='image/jpeg')
return 'No frame available', 404 return 'No frame available', 404
@app.route('/rois/<camera_id>', methods=['GET']) @app.route('/rois/<camera_id>', methods=['GET'])
def get_rois(camera_id): def get_rois(camera_id):
# ... (Same logic as Step 3, just ensure it uses camera_manager) ...
try: try:
camera_manager.load_roi_config() camera_manager.load_roi_config()
all_rois = camera_manager.rois all_rois = camera_manager.rois
img_width = request.args.get("img_width", type=float) img_width = request.args.get("img_width", type=float)
img_height = request.args.get("img_height", type=float) img_height = request.args.get("img_height", type=float)
if not img_width or not img_height: if not img_width or not img_height:
return jsonify(all_rois.get(camera_id, [])) return jsonify(all_rois.get(camera_id, []))
cam = camera_manager.cameras.get(camera_id) cam = camera_manager.cameras.get(camera_id)
if cam and cam.get("stream"): if cam and cam.get("stream"):
real_w = cam["stream"].width or cam["width"] real_w = cam["stream"].width or cam["width"]
real_h = cam["stream"].height or cam["height"] real_h = cam["stream"].height or cam["height"]
else: else:
return jsonify({"error": "Camera not ready"}), 500 return jsonify({"error": "Camera not ready"}), 500
scaleX = img_width / real_w scaleX = img_width / real_w
scaleY = img_height / real_h scaleY = img_height / real_h
scaled_rois = [] scaled_rois = []
for roi in all_rois.get(camera_id, []): for roi in all_rois.get(camera_id, []):
scaled_rois.append({ scaled_rois.append({
@ -347,34 +207,29 @@ def get_rois(camera_id):
"y": int(round(roi["y"] * scaleY)), "y": int(round(roi["y"] * scaleY)),
"width": int(round(roi["width"] * scaleX)), "width": int(round(roi["width"] * scaleX)),
"height": int(round(roi["height"] * scaleY)), "height": int(round(roi["height"] * scaleY)),
"angle": roi.get("angle", 0), "angle": roi["angle"]
}) })
return jsonify(scaled_rois) return jsonify(scaled_rois)
except Exception as e: except Exception as e:
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
@app.route("/save_rois", methods=["POST"]) @app.route("/save_rois", methods=["POST"])
def save_rois_api(): def save_rois_api():
# ... (Same logic as Step 3) ...
data = request.json data = request.json
camera_id = data.get("camera_id") camera_id = data.get("camera_id")
new_rois = data.get("rois") new_rois = data.get("rois")
img_width = data.get("img_width") img_width = data.get("img_width")
img_height = data.get("img_height") img_height = data.get("img_height")
if not camera_id or new_rois is None: if not camera_id or new_rois is None: return jsonify({"success": False})
return jsonify({"success": False})
cam = camera_manager.cameras.get(camera_id) cam = camera_manager.cameras.get(camera_id)
if not cam: if not cam: return jsonify({"success": False})
return jsonify({"success": False})
stream = cam.get("stream") stream = cam.get("stream")
real_w = stream.width if stream and getattr(stream, "width", None) else cam["width"] real_w = stream.width if stream and stream.width else cam["width"]
real_h = stream.height if stream and getattr(stream, "height", None) else cam["height"] real_h = stream.height if stream and stream.height else cam["height"]
scaleX = real_w / img_width if img_width else 1 scaleX = real_w / img_width if img_width else 1
scaleY = real_h / img_height if img_height else 1 scaleY = real_h / img_height if img_height else 1
@ -387,26 +242,24 @@ def save_rois_api():
"y": int(round(roi["y"] * scaleY)), "y": int(round(roi["y"] * scaleY)),
"width": int(round(roi["width"] * scaleX)), "width": int(round(roi["width"] * scaleX)),
"height": int(round(roi["height"] * scaleY)), "height": int(round(roi["height"] * scaleY)),
"angle": roi.get("angle", 0), "angle": roi["angle"]
}) })
camera_manager.rois[camera_id] = scaled_rois camera_manager.rois[camera_id] = scaled_rois
return jsonify(camera_manager.save_roi_config()) return jsonify(camera_manager.save_roi_config())
@app.route('/crop', methods=['POST']) @app.route('/crop', methods=['POST'])
def crop(): def crop():
# Helper for UI
data = request.json data = request.json
camera_id = data.get('camera_id') camera_id = data.get('camera_id')
scaleX = data.get('scaleX', 1) scaleX = data.get('scaleX', 1)
scaleY = data.get('scaleY', 1) scaleY = data.get('scaleY', 1)
frame = camera_manager.get_frame(camera_id) frame = camera_manager.get_frame(camera_id)
if frame is None: if frame is None: return jsonify({'error': 'No frame'}), 500
return jsonify({'error': 'No frame'}), 500
roi_list = camera_manager.rois.get(camera_id, []) roi_list = camera_manager.rois.get(camera_id, [])
# Use the local UI helper function
cropped_images = crop_image_for_ui(frame, roi_list, scaleX, scaleY) cropped_images = crop_image_for_ui(frame, roi_list, scaleX, scaleY)
cropped_base64_list = [] cropped_base64_list = []
@ -414,81 +267,67 @@ def crop():
ret, buffer = cv2.imencode('.jpg', cropped_img) ret, buffer = cv2.imencode('.jpg', cropped_img)
if ret: if ret:
cropped_base64_list.append(base64.b64encode(buffer).decode('utf-8')) cropped_base64_list.append(base64.b64encode(buffer).decode('utf-8'))
return jsonify({'cropped_images': cropped_base64_list}) return jsonify({'cropped_images': cropped_base64_list})
@app.route('/detect_digits', methods=['POST']) @app.route('/detect_digits', methods=['POST'])
def detect_digits(): def detect_digits():
"""Manual trigger: Runs inference immediately and returns result with validation.""" """Manual trigger: Runs inference immediately and returns result."""
data = request.json data = request.json
camera_id = data.get('camera_id') camera_id = data.get('camera_id')
if not camera_id: if not camera_id:
return jsonify({'error': 'Invalid camera ID'}), 400 return jsonify({'error': 'Invalid camera ID'}), 400
# 1. Get Frame
frame = camera_manager.get_frame(camera_id) frame = camera_manager.get_frame(camera_id)
if frame is None: if frame is None:
return jsonify({'error': 'Failed to capture image'}), 500 return jsonify({'error': 'Failed to capture image'}), 500
# 2. Get ROIs
roi_list = camera_manager.rois.get(camera_id, []) roi_list = camera_manager.rois.get(camera_id, [])
if not roi_list: if not roi_list:
return jsonify({'error': 'No ROIs defined'}), 400 return jsonify({'error': 'No ROIs defined'}), 400
# 3. Crop (Using the UI helper is fine here)
cropped_images = crop_image_for_ui(frame, roi_list, scaleX=1, scaleY=1) cropped_images = crop_image_for_ui(frame, roi_list, scaleX=1, scaleY=1)
if not cropped_images: if not cropped_images:
return jsonify({'error': 'Failed to crop ROIs'}), 500 return jsonify({'error': 'Failed to crop ROIs'}), 500
# 4. Run Inference Synchronously
# Note: We access the worker directly.
# Thread safety: 'predict_batch' uses 'self.interpreter'.
# If the background thread is also using it, TFLite might complain or crash.
# PROPER FIX: Pause the worker or use a Lock.
# Since adding a Lock is complex now, a simple hack is to just add it to the queue
# and WAIT for the result? No, that's hard to correlate.
# SAFE APPROACH: Use a Lock in InferenceWorker.
# For now, let's assume TFLite is robust enough or race conditions are rare for manual clicks.
# CALL THE PUBLIC METHOD:
try: try:
predictions = inference_worker.predict_batch(cropped_images) detected_digits = inference_worker.predict_batch(cropped_images)
valid_digits = [d for d in detected_digits if d.isdigit()]
if not valid_digits:
return jsonify({'error': 'No valid digits detected', 'raw': detected_digits}), 500
valid_digits_str = [] final_number = int("".join(valid_digits))
confidences = []
rejected_reasons = [] # Publish and Update State
publish_detected_number(camera_id, final_number)
camera_manager.results[camera_id] = final_number
CONFIDENCE_THRESHOLD = inference_worker.CONFIDENCE_THRESHOLD logger.info(f"Manual detection for {camera_id}: {final_number}")
MIN_VALUE = inference_worker.MIN_VALUE
MAX_VALUE = inference_worker.MAX_VALUE return jsonify({
'detected_digits': valid_digits,
for i, p in enumerate(predictions): 'final_number': final_number
if p['confidence'] < CONFIDENCE_THRESHOLD: })
msg = f"Digit {i} ('{p['digit']}') rejected: conf {p['confidence']:.2f} < {CONFIDENCE_THRESHOLD}"
rejected_reasons.append(msg)
if DEBUG_LOG:
logger.warning("[Manual] %s", msg)
else:
valid_digits_str.append(p['digit'])
confidences.append(p['confidence'])
if len(valid_digits_str) != len(predictions):
return jsonify({'error': 'Low confidence detection', 'details': rejected_reasons, 'raw': predictions}), 400
final_number_str = "".join(valid_digits_str)
try:
final_number = int(final_number_str)
if not (MIN_VALUE <= final_number <= MAX_VALUE):
msg = f"Value {final_number} out of range ({MIN_VALUE}-{MAX_VALUE})"
if DEBUG_LOG:
logger.warning("[Manual] %s", msg)
return jsonify({'error': 'Value out of range', 'value': final_number}), 400
avg_conf = float(np.mean(confidences)) if confidences else None
publish_detected_number(camera_id, final_number, avg_conf)
camera_manager.results[camera_id] = final_number
return jsonify({
'detected_digits': valid_digits_str,
'final_number': final_number,
'confidences': confidences,
'avg_confidence': avg_conf,
})
except ValueError:
return jsonify({'error': 'Could not parse digits', 'raw': valid_digits_str}), 500
except Exception as e: except Exception as e:
logger.error("Error during manual detection: %s", e) logger.error(f"Error during manual detection: {e}")
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@ -498,8 +337,13 @@ def update_camera_config():
success = camera_manager.update_camera_flip(data.get("camera_id"), data.get("flip_type")) success = camera_manager.update_camera_flip(data.get("camera_id"), data.get("flip_type"))
return jsonify({"success": success}) return jsonify({"success": success})
# --- Main ---
if __name__ == '__main__': if __name__ == '__main__':
# Threading:
# 1. Video Threads (in Manager)
# 2. Inference Thread (in Worker)
# 3. Main Loop (process_all_cameras - handles feeding)
t = threading.Thread(target=process_all_cameras, daemon=True) t = threading.Thread(target=process_all_cameras, daemon=True)
t.start() t.start()

View File

@ -1,115 +1,63 @@
import logging
import queue
import threading import threading
import queue
import time import time
import logging
import cv2 import cv2
import numpy as np import numpy as np
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from config import Config from config import Config
# ------------------------------------------------------------------------------
# 1. USER CONFIGURATION (Edit these values here)
# ------------------------------------------------------------------------------
# Minimum confidence (0-1) to accept a digit.
# - Higher (0.85-0.90) reduces false positives like "1010" from noise.
# - Lower (0.70-0.75) helps with weak/dark digits.
CONFIDENCE_THRESHOLD = 0.1
# Minimum and Maximum expected values for the number.
MIN_VALUE = 5
MAX_VALUE = 100
# ------------------------------------------------------------------------------
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _cfg(*names, default=None):
for n in names:
if hasattr(Config, n):
return getattr(Config, n)
return default
class InferenceWorker: class InferenceWorker:
def __init__(self, debug_log: bool = False): def __init__(self):
self.debug_log = bool(debug_log)
self.input_queue = queue.Queue(maxsize=10) self.input_queue = queue.Queue(maxsize=10)
self.result_queue = queue.Queue() self.result_queue = queue.Queue()
self.running = False self.running = False
self.interpreter = None self.interpreter = None
self.input_details = None self.input_details = None
self.output_details = None self.output_details = None
self.lock = threading.Lock() self.lock = threading.Lock()
# Debug counters / telemetry # Load Model
self.task_seq = 0
self.dropped_tasks = 0
self.processed_tasks = 0
self.last_invoke_secs = None
# Set thresholds from top-level variables
self.CONFIDENCE_THRESHOLD = CONFIDENCE_THRESHOLD
self.MIN_VALUE = MIN_VALUE
self.MAX_VALUE = MAX_VALUE
self.load_model() self.load_model()
def load_model(self): def load_model(self):
try: try:
model_path = _cfg("MODEL_PATH", "MODELPATH", default=None) logger.info(f"Loading TFLite model from: {Config.MODEL_PATH}")
logger.info("Loading TFLite model from: %s", model_path) self.interpreter = tflite.Interpreter(model_path=Config.MODEL_PATH)
self.interpreter = tflite.Interpreter(model_path=model_path)
self.interpreter.allocate_tensors() self.interpreter.allocate_tensors()
self.input_details = self.interpreter.get_input_details() self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details() self.output_details = self.interpreter.get_output_details()
# Store original input shape for resizing logic
self.original_input_shape = self.input_details[0]['shape'] self.original_input_shape = self.input_details[0]['shape']
if self.debug_log: logger.info(f"Model loaded. Default input shape: {self.original_input_shape}")
logger.info("Model loaded. Default input shape: %s", self.original_input_shape)
except Exception as e: except Exception as e:
logger.critical("Failed to load TFLite model: %s", e) logger.critical(f"Failed to load TFLite model: {e}")
self.interpreter = None self.interpreter = None
def start(self): def start(self):
if self.running: if self.running: return
return
self.running = True self.running = True
threading.Thread(target=self._worker_loop, daemon=True).start() threading.Thread(target=self._worker_loop, daemon=True).start()
logger.info("Inference worker started.") logger.info("Inference worker started.")
def add_task(self, camera_id, rois, frame, frame_std=None): def add_task(self, camera_id, rois, frame):
"""Add task (non-blocking).""" """Add task (non-blocking)."""
if not self.interpreter: if not self.interpreter: return
return
self.task_seq += 1
task = {
'camera_id': camera_id,
'rois': rois,
'frame': frame,
'timestamp': time.time(),
'task_id': self.task_seq,
'frame_std': frame_std,
}
try: try:
task = {
'camera_id': camera_id,
'rois': rois,
'frame': frame,
'timestamp': time.time()
}
self.input_queue.put(task, block=False) self.input_queue.put(task, block=False)
except queue.Full: except queue.Full:
self.dropped_tasks += 1 pass
logger.warning(
"add_task drop cam=%s qsize=%d dropped=%d",
camera_id,
self.input_queue.qsize(),
self.dropped_tasks,
)
def get_result(self): def get_result(self):
try: try:
@ -117,12 +65,6 @@ class InferenceWorker:
except queue.Empty: except queue.Empty:
return None return None
def _put_result(self, d):
try:
self.result_queue.put(d, block=False)
except Exception:
logger.exception("Failed to enqueue result")
def _worker_loop(self): def _worker_loop(self):
while self.running: while self.running:
try: try:
@ -133,133 +75,35 @@ class InferenceWorker:
cam_id = task['camera_id'] cam_id = task['camera_id']
rois = task['rois'] rois = task['rois']
frame = task['frame'] frame = task['frame']
task_id = task.get('task_id')
task_ts = task.get('timestamp')
if self.debug_log:
try:
age_s = (time.time() - task_ts) if task_ts else None
logger.info(
"Worker got task cam=%s task_id=%s age_s=%s frame_std=%s rois=%d in_q=%d",
cam_id,
task_id,
(f"{age_s:.3f}" if age_s is not None else "n/a"),
task.get('frame_std'),
len(rois) if rois else 0,
self.input_queue.qsize(),
)
except Exception:
pass
try: try:
t0 = time.time() # 1. Crop all ROIs
crops = self._crop_rois(frame, rois) crops = self._crop_rois(frame, rois)
t_crop = time.time() if not crops: continue
if not crops: # 2. Batch Predict (Optimized Step)
self._put_result({ digits = self.predict_batch(crops)
'type': 'error',
'camera_id': cam_id,
'message': 'No ROIs cropped',
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'total': t_crop - t0},
})
continue
predictions = self.predict_batch(crops) # 3. Combine
t_pred = time.time() valid_digits = [d for d in digits if d.isdigit()]
if len(valid_digits) == len(digits) and len(valid_digits) > 0:
valid_digits_str = [] final_number = int("".join(valid_digits))
confidences = []
low_conf_details = [] self.result_queue.put({
for i, p in enumerate(predictions):
if p['confidence'] < self.CONFIDENCE_THRESHOLD:
low_conf_details.append(
f"Digit {i} conf {p['confidence']:.2f} < {self.CONFIDENCE_THRESHOLD}"
)
valid_digits_str.append(p['digit'])
confidences.append(p['confidence'])
if low_conf_details:
self._put_result({
'type': 'error',
'camera_id': cam_id,
'message': f"Low confidence: {', '.join(low_conf_details)}",
'digits': valid_digits_str,
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'predict': t_pred - t_crop, 'total': t_pred - t0},
})
continue
if not valid_digits_str:
self._put_result({
'type': 'error',
'camera_id': cam_id,
'message': 'No digits produced',
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'predict': t_pred - t_crop, 'total': t_pred - t0},
})
continue
final_number_str = "".join(valid_digits_str)
try:
final_number = int(final_number_str)
except ValueError:
self._put_result({
'type': 'error',
'camera_id': cam_id,
'message': f"Parse error: {valid_digits_str}",
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'predict': t_pred - t_crop, 'total': t_pred - t0},
})
continue
if self.MIN_VALUE <= final_number <= self.MAX_VALUE:
avg_conf = float(np.mean(confidences)) if confidences else None
self._put_result({
'type': 'success',
'camera_id': cam_id, 'camera_id': cam_id,
'value': final_number, 'value': final_number,
'digits': valid_digits_str, 'digits': valid_digits
'confidence': avg_conf,
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'predict': t_pred - t_crop, 'total': t_pred - t0},
}) })
self.processed_tasks += 1
else: except Exception as e:
self._put_result({ logger.error(f"Inference error for {cam_id}: {e}")
'type': 'error',
'camera_id': cam_id,
'message': f"Value {final_number} out of range ({self.MIN_VALUE}-{self.MAX_VALUE})",
'value': final_number,
'task_id': task_id,
'task_ts': task_ts,
'timing_s': {'crop': t_crop - t0, 'predict': t_pred - t_crop, 'total': t_pred - t0},
})
except Exception:
logger.exception("Inference error cam=%s task_id=%s", cam_id, task_id)
self._put_result({
'type': 'error',
'camera_id': cam_id,
'message': 'Exception during inference; see logs',
'task_id': task_id,
'task_ts': task_ts,
})
def _crop_rois(self, image, roi_list): def _crop_rois(self, image, roi_list):
cropped_images = [] cropped_images = []
for roi in roi_list: for roi in roi_list:
try: try:
x, y, w, h = roi['x'], roi['y'], roi['width'], roi['height'] x, y, w, h = roi['x'], roi['y'], roi['width'], roi['height']
cropped = image[y:y + h, x:x + w] cropped = image[y:y+h, x:x+w]
if cropped.size > 0: if cropped.size > 0:
cropped_images.append(cropped) cropped_images.append(cropped)
except Exception: except Exception:
@ -267,60 +111,55 @@ class InferenceWorker:
return cropped_images return cropped_images
def predict_batch(self, images): def predict_batch(self, images):
"""Run inference on a batch of images. """Run inference on a batch of images at once."""
with self.lock: # <--- Add this wrapper
Returns list of dicts: {'digit': str, 'confidence': float} if not self.interpreter: return []
"""
with self.lock:
if not self.interpreter:
return []
num_images = len(images) num_images = len(images)
if num_images == 0: if num_images == 0: return []
return []
input_index = self.input_details[0]['index'] input_index = self.input_details[0]['index']
output_index = self.output_details[0]['index'] output_index = self.output_details[0]['index']
# Preprocess all images into a single batch array
# Shape: [N, 32, 20, 3] (assuming model expects 32x20 rgb)
batch_input = [] batch_input = []
target_h, target_w = 32, 20
target_h, target_w = 32, 20 # Based on your previous code logic
for img in images: for img in images:
# Resize
roi_resized = cv2.resize(img, (target_w, target_h)) roi_resized = cv2.resize(img, (target_w, target_h))
# Color
roi_rgb = cv2.cvtColor(roi_resized, cv2.COLOR_BGR2RGB) roi_rgb = cv2.cvtColor(roi_resized, cv2.COLOR_BGR2RGB)
# Normalize
roi_norm = roi_rgb.astype(np.float32) roi_norm = roi_rgb.astype(np.float32)
batch_input.append(roi_norm) batch_input.append(roi_norm)
# Create batch tensor
input_tensor = np.array(batch_input) input_tensor = np.array(batch_input)
# Keep current behavior (resize+allocate per batch). Debug timing is optional. # --- DYNAMIC RESIZING ---
# TFLite models have a fixed input size (usually batch=1).
# We must resize the input tensor to match our current batch size (N).
# 1. Resize input tensor
self.interpreter.resize_tensor_input(input_index, [num_images, target_h, target_w, 3]) self.interpreter.resize_tensor_input(input_index, [num_images, target_h, target_w, 3])
# 2. Re-allocate tensors (This is expensive! See note below)
self.interpreter.allocate_tensors() self.interpreter.allocate_tensors()
# 3. Run Inference
self.interpreter.set_tensor(input_index, input_tensor) self.interpreter.set_tensor(input_index, input_tensor)
t0 = time.time()
self.interpreter.invoke() self.interpreter.invoke()
self.last_invoke_secs = time.time() - t0
# 4. Get Results
if self.debug_log and self.last_invoke_secs and self.last_invoke_secs > 1.0:
logger.warning("Slow invoke: %.3fs (batch=%d)", self.last_invoke_secs, num_images)
output_data = self.interpreter.get_tensor(output_index) output_data = self.interpreter.get_tensor(output_index)
results = [] # Result shape is [N, 10] (probabilities for 10 digits)
predictions = []
for i in range(num_images): for i in range(num_images):
logits = output_data[i] digit_class = np.argmax(output_data[i])
predictions.append(str(digit_class))
# Numerically stable softmax
logits = logits - np.max(logits) return predictions
ex = np.exp(logits)
denom = np.sum(ex)
probs = (ex / denom) if denom != 0 else np.zeros_like(ex)
digit_class = int(np.argmax(probs))
confidence = float(probs[digit_class]) if probs.size else 0.0
results.append({'digit': str(digit_class), 'confidence': confidence})
return results

View File

@ -1,63 +0,0 @@
# RTSPCamDigitDetection Backend
This service reads frames from one or more RTSP cameras, crops user-defined ROIs (Regions of Interest) containing 7segment digits, runs a TensorFlow Lite digit classifier, and publishes the assembled numeric value to MQTT (e.g., for Home Assistant) [code_file:9][code_file:11].
It also exposes a Flask web UI/API for live video previews, snapshots, and ROI configuration [code_file:9].
## High-Level Architecture
* **`app.py` (Main Service):** Runs the Flask web server and the main processing loop. It drains results from the inference worker, handles MQTT publishing, and manages the rate-limiting logic per camera [code_file:9].
* **`inference.py` (Worker):** A background thread that performs the heavy lifting. It accepts frames, crops the ROIs, runs the TFLite model, and validates results against confidence thresholds and numeric ranges [code_file:11].
* **`manager.py` & `camera.py`:** Handles the RTSP connections, frame buffering, and camera configuration management.
## Configuration
Configuration variables are defined directly at the top of the scripts for simplicity.
### 1. Main Settings (`app.py`)
Edit the top section of `app.py` to change these [code_file:9]:
* **`DEBUG_LOG`** *(bool)*:
* `True`: Enables verbose logging (heartbeats every 5s, per-task timing, detailed skip reasons).
* `False`: Quiet mode. Only prints crucial info (MQTT publishes, connection errors). Warnings about skipped frames are rate-limited to once per minute to prevent log flooding.
* **`DETECTION_INTERVAL`** *(int)*:
* How often (in seconds) to attempt a detection for each camera (Default: `10`).
* **`FRAME_STD_THRESHOLD`** *(float)*:
* Standard Deviation threshold to filter out "bad" frames before inference.
* Frames with contrast lower than this value are skipped.
* *Recommendation:* Set to `0` or `1` if your valid digits are low-contrast. Set higher (e.g., `25`) only if you need to filter out specific gray/green encoding artifacts.
### 2. Inference Settings (`inference.py`)
Edit the top section of `inference.py` to tune the AI [code_file:11]:
* **`CONFIDENCE_THRESHOLD`** *(0.0 - 1.0)*:
* Minimum confidence required for a digit to be accepted.
* *Recommendation:* `0.85` is a good balance to prevent false positives like "1010" while accepting valid digits.
* **`MIN_VALUE` / `MAX_VALUE`** *(int)*:
* Sanity check range. Decoded numbers outside this range (e.g., `1010`) are discarded.
## How It Works (Logic & Logging)
### The "10s vs 60s" Behavior
You might notice that successful detections are logged every **10 seconds**, but errors (like "Value out of range") appear only every **60 seconds**.
* **Success:** The app attempts detection every `DETECTION_INTERVAL` (10s). Every success is logged immediately as INFO.
* **Failure:** If the camera feed is bad or the value is out of range, the error technically occurs every 10s. However, when `DEBUG_LOG = False`, these repetitive warnings are suppressed and only printed **once per minute** to keep the CLI readable [code_file:9].
### MQTT Behavior
* **Topic:** `homeassistant/sensor/RTSPCamDigitDetection/state/<camera_id>`
* **Payload:** `{"value": 42, "confidence": 0.98}`
* **Trigger:** Published only on **successful** detection and validation. Errors are not published to this topic to avoid messing up sensor history.
## API Endpoints
The app runs on port `5000` by default [code_file:9].
| Method | Endpoint | Description |
| :--- | :--- | :--- |
| `GET` | `/` | Web UI Entry point. |
| `GET` | `/video/<camera_id>` | MJPEG stream for live preview. |
| `GET` | `/snapshot/<camera_id>` | Capture a single JPEG snapshot. |
| `GET` | `/cameras` | List configured cameras. |
| `GET` | `/rois/<camera_id>` | Get current ROI definitions. |
| `POST` | `/save_rois` | Save new ROI definitions to disk. |
| `POST` | `/detect_digits` | Manual trigger: runs inference immediately and returns full debug details (JSON). |