EdgeAI_Digit_Recognition/app.py

458 lines
15 KiB
Python

import base64
import json
import logging
import sys
import threading
import time
import traceback
import cv2
import numpy as np
import paho.mqtt.client as mqtt
from flask import Flask, Response, jsonify, render_template, request
from config import Config
from inference import InferenceWorker
from manager import CameraManager
def _cfg(*names, default=None):
"""Return first matching attribute from Config, else default."""
for n in names:
if hasattr(Config, n):
return getattr(Config, n)
return default
LOG_LEVEL = _cfg("LOG_LEVEL", "LOGLEVEL", default=logging.INFO)
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# --- Initialize Components ---
camera_manager = CameraManager()
inference_worker = InferenceWorker()
inference_worker.start()
# --- MQTT Setup ---
mqtt_client = mqtt.Client()
MQTT_USERNAME = _cfg("MQTT_USERNAME", "MQTTUSERNAME", default=None)
MQTT_PASSWORD = _cfg("MQTT_PASSWORD", "MQTTPASSWORD", default=None)
MQTT_BROKER = _cfg("MQTT_BROKER", "MQTTBROKER", default="127.0.0.1")
MQTT_PORT = int(_cfg("MQTT_PORT", "MQTTPORT", default=1883))
MQTT_TOPIC = _cfg("MQTT_TOPIC", "MQTTTOPIC", default="homeassistant/sensor/RTSPCamDigitDetection/state")
if MQTT_USERNAME and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
logger.info("Connected to MQTT Broker at %s:%s", MQTT_BROKER, MQTT_PORT)
except Exception as e:
logger.error("Failed to connect to MQTT Broker: %s", e)
# --- Helper Functions (UI Only) ---
def crop_image_for_ui(image, roi_list, scaleX, scaleY):
"""Helper for the /crop endpoint (UI preview only)."""
cropped_images = []
for roi in roi_list:
try:
x = int(roi['x'] * scaleX)
y = int(roi['y'] * scaleY)
width = int(roi['width'] * scaleX)
height = int(roi['height'] * scaleY)
cropped = image[y:y + height, x:x + width]
if cropped.size > 0:
cropped_images.append(cropped)
except Exception:
pass
return cropped_images
def publish_detected_number(camera_id, detected_number, confidence=None):
"""Publish result to MQTT with optional confidence score."""
topic = f"{MQTT_TOPIC}/{camera_id}"
payload_dict = {"value": detected_number}
if confidence is not None:
payload_dict["confidence"] = round(float(confidence), 2)
payload = json.dumps(payload_dict)
try:
mqtt_client.publish(topic, payload)
log_msg = f"Published to {topic}: {detected_number}"
if confidence is not None:
log_msg += f" (Conf: {confidence:.2f})"
logger.info(log_msg)
except Exception as e:
logger.error("MQTT Publish failed: %s", e)
# --- Debug helpers ---
_last_log = {}
def log_rl(level, key, msg, every_s=10):
now = time.time()
last = _last_log.get(key, 0.0)
if now - last >= every_s:
_last_log[key] = now
logger.log(level, msg)
# --- Main Processing Loop (Refactored) ---
last_processed_time = {}
def process_all_cameras():
"""Revised loop with rate limiting + debug instrumentation."""
DETECTION_INTERVAL = int(_cfg("DETECTION_INTERVAL", default=10))
hb_last = 0.0
while True:
try:
# Heartbeat (proves loop is alive even when no publishes happen)
now = time.time()
if now - hb_last >= 5.0:
hb_last = now
in_q = getattr(inference_worker, "input_queue", None)
out_q = getattr(inference_worker, "result_queue", None)
logger.info(
"HB mainloop alive; in_q=%s out_q=%s dropped=%s processed=%s last_invoke_s=%s",
(in_q.qsize() if in_q else "n/a"),
(out_q.qsize() if out_q else "n/a"),
getattr(inference_worker, "dropped_tasks", "n/a"),
getattr(inference_worker, "processed_tasks", "n/a"),
getattr(inference_worker, "last_invoke_secs", "n/a"),
)
# --- Part 1: Process Results ---
while True:
result = inference_worker.get_result()
if not result:
break
cam_id = result.get('camera_id')
# End-to-end latency tracing
task_ts = result.get("task_ts")
if task_ts is not None:
try:
age = time.time() - float(task_ts)
logger.info(
"Result cam=%s type=%s task_id=%s age_s=%.3f timing=%s",
cam_id,
result.get("type"),
result.get("task_id"),
age,
result.get("timing_s"),
)
except Exception:
pass
if result.get('type') == 'success':
val = result['value']
conf = result.get('confidence')
camera_manager.results[cam_id] = val
publish_detected_number(cam_id, val, conf)
elif result.get('type') == 'error':
msg = result.get('message', 'Unknown error')
logger.warning("[%s] Detection skipped: %s", cam_id, msg)
# --- Part 2: Feed Frames ---
camera_manager.load_roi_config()
for camera_id, camera_data in camera_manager.cameras.items():
if not camera_data.get("active", True):
continue
current_time = time.time()
last_time = last_processed_time.get(camera_id, 0.0)
if current_time - last_time < DETECTION_INTERVAL:
log_rl(
logging.DEBUG,
f"{camera_id}:rate",
f"[{camera_id}] skip: rate limit ({current_time - last_time:.2f}s<{DETECTION_INTERVAL}s)",
every_s=30,
)
continue
stream = camera_data.get("stream")
if not stream:
log_rl(logging.WARNING, f"{camera_id}:nostream", f"[{camera_id}] skip: no stream", every_s=10)
continue
# Warmup check
start_time = getattr(stream, "start_time", getattr(stream, "starttime", None))
if start_time is not None and (current_time - start_time) < 5:
log_rl(logging.DEBUG, f"{camera_id}:warmup", f"[{camera_id}] skip: warmup", every_s=10)
continue
frame = stream.read()
if frame is None:
log_rl(logging.WARNING, f"{camera_id}:noframe", f"[{camera_id}] skip: frame is None", every_s=5)
continue
frame_std = float(np.std(frame))
if frame_std < 5:
log_rl(
logging.INFO,
f"{camera_id}:lowstd",
f"[{camera_id}] skip: low frame std={frame_std:.2f} (<10) (disturbed/blank/frozen?)",
every_s=5,
)
mqtt_client.publish(f"{Config.MQTT_TOPIC}/{camera_id}/status", "disturbed")
continue
roi_list = camera_manager.rois.get(camera_id, [])
if not roi_list:
log_rl(logging.WARNING, f"{camera_id}:norois", f"[{camera_id}] skip: no ROIs", every_s=30)
continue
inference_worker.add_task(camera_id, roi_list, frame, frame_std=frame_std)
last_processed_time[camera_id] = current_time
time.sleep(0.1)
except Exception as e:
logger.error("Global process loop error: %s", e)
traceback.print_exc()
time.sleep(5)
# --- Flask Routes ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/cameras', methods=['GET'])
def get_cameras():
return jsonify(camera_manager.get_camera_list())
@app.route('/video/<camera_id>')
def video_feed(camera_id):
def generate():
while True:
frame = camera_manager.get_frame(camera_id)
if frame is not None:
ret, jpeg = cv2.imencode('.jpg', frame)
if ret:
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n'
)
else:
time.sleep(0.1)
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/snapshot/<camera_id>')
def snapshot(camera_id):
frame = camera_manager.get_frame(camera_id)
if frame is not None:
ret, jpeg = cv2.imencode('.jpg', frame)
if ret:
return Response(jpeg.tobytes(), mimetype='image/jpeg')
return 'No frame available', 404
@app.route('/rois/<camera_id>', methods=['GET'])
def get_rois(camera_id):
try:
camera_manager.load_roi_config()
all_rois = camera_manager.rois
img_width = request.args.get("img_width", type=float)
img_height = request.args.get("img_height", type=float)
if not img_width or not img_height:
return jsonify(all_rois.get(camera_id, []))
cam = camera_manager.cameras.get(camera_id)
if cam and cam.get("stream"):
real_w = cam["stream"].width or cam["width"]
real_h = cam["stream"].height or cam["height"]
else:
return jsonify({"error": "Camera not ready"}), 500
scaleX = img_width / real_w
scaleY = img_height / real_h
scaled_rois = []
for roi in all_rois.get(camera_id, []):
scaled_rois.append({
"id": roi["id"],
"x": int(round(roi["x"] * scaleX)),
"y": int(round(roi["y"] * scaleY)),
"width": int(round(roi["width"] * scaleX)),
"height": int(round(roi["height"] * scaleY)),
"angle": roi.get("angle", 0),
})
return jsonify(scaled_rois)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/save_rois", methods=["POST"])
def save_rois_api():
data = request.json
camera_id = data.get("camera_id")
new_rois = data.get("rois")
img_width = data.get("img_width")
img_height = data.get("img_height")
if not camera_id or new_rois is None:
return jsonify({"success": False})
cam = camera_manager.cameras.get(camera_id)
if not cam:
return jsonify({"success": False})
stream = cam.get("stream")
real_w = stream.width if stream and getattr(stream, "width", None) else cam["width"]
real_h = stream.height if stream and getattr(stream, "height", None) else cam["height"]
scaleX = real_w / img_width if img_width else 1
scaleY = real_h / img_height if img_height else 1
scaled_rois = []
for roi in new_rois:
scaled_rois.append({
"id": roi["id"],
"x": int(round(roi["x"] * scaleX)),
"y": int(round(roi["y"] * scaleY)),
"width": int(round(roi["width"] * scaleX)),
"height": int(round(roi["height"] * scaleY)),
"angle": roi.get("angle", 0),
})
camera_manager.rois[camera_id] = scaled_rois
return jsonify(camera_manager.save_roi_config())
@app.route('/crop', methods=['POST'])
def crop():
data = request.json
camera_id = data.get('camera_id')
scaleX = data.get('scaleX', 1)
scaleY = data.get('scaleY', 1)
frame = camera_manager.get_frame(camera_id)
if frame is None:
return jsonify({'error': 'No frame'}), 500
roi_list = camera_manager.rois.get(camera_id, [])
cropped_images = crop_image_for_ui(frame, roi_list, scaleX, scaleY)
cropped_base64_list = []
for cropped_img in cropped_images:
ret, buffer = cv2.imencode('.jpg', cropped_img)
if ret:
cropped_base64_list.append(base64.b64encode(buffer).decode('utf-8'))
return jsonify({'cropped_images': cropped_base64_list})
@app.route('/detect_digits', methods=['POST'])
def detect_digits():
"""Manual trigger: Runs inference immediately and returns result with validation."""
data = request.json
camera_id = data.get('camera_id')
if not camera_id:
return jsonify({'error': 'Invalid camera ID'}), 400
frame = camera_manager.get_frame(camera_id)
if frame is None:
return jsonify({'error': 'Failed to capture image'}), 500
roi_list = camera_manager.rois.get(camera_id, [])
if not roi_list:
return jsonify({'error': 'No ROIs defined'}), 400
cropped_images = crop_image_for_ui(frame, roi_list, scaleX=1, scaleY=1)
if not cropped_images:
return jsonify({'error': 'Failed to crop ROIs'}), 500
try:
predictions = inference_worker.predict_batch(cropped_images)
valid_digits_str = []
confidences = []
rejected_reasons = []
CONFIDENCE_THRESHOLD = inference_worker.CONFIDENCE_THRESHOLD
MIN_VALUE = inference_worker.MIN_VALUE
MAX_VALUE = inference_worker.MAX_VALUE
for i, p in enumerate(predictions):
if p['confidence'] < CONFIDENCE_THRESHOLD:
msg = f"Digit {i} ('{p['digit']}') rejected: conf {p['confidence']:.2f} < {CONFIDENCE_THRESHOLD}"
rejected_reasons.append(msg)
logger.warning("[Manual] %s", msg)
else:
valid_digits_str.append(p['digit'])
confidences.append(p['confidence'])
if len(valid_digits_str) != len(predictions):
return jsonify({'error': 'Low confidence detection', 'details': rejected_reasons, 'raw': predictions}), 400
final_number_str = "".join(valid_digits_str)
try:
final_number = int(final_number_str)
if not (MIN_VALUE <= final_number <= MAX_VALUE):
msg = f"Value {final_number} out of range ({MIN_VALUE}-{MAX_VALUE})"
logger.warning("[Manual] %s", msg)
return jsonify({'error': 'Value out of range', 'value': final_number}), 400
avg_conf = float(np.mean(confidences)) if confidences else None
publish_detected_number(camera_id, final_number, avg_conf)
camera_manager.results[camera_id] = final_number
return jsonify({
'detected_digits': valid_digits_str,
'final_number': final_number,
'confidences': confidences,
'avg_confidence': avg_conf,
})
except ValueError:
return jsonify({'error': 'Could not parse digits', 'raw': valid_digits_str}), 500
except Exception as e:
logger.error("Error during manual detection: %s", e)
return jsonify({'error': str(e)}), 500
@app.route('/update_camera_config', methods=['POST'])
def update_camera_config():
data = request.json
success = camera_manager.update_camera_flip(data.get("camera_id"), data.get("flip_type"))
return jsonify({"success": success})
# --- Main ---
if __name__ == '__main__':
t = threading.Thread(target=process_all_cameras, daemon=True)
t.start()
logger.info("Starting Flask Server...")
app.run(host='0.0.0.0', port=5000, threaded=True)