import logging import sys import os import threading import json import time import traceback import base64 import cv2 import numpy as np import paho.mqtt.client as mqtt from flask import Flask, render_template, jsonify, request, Response # test # Import Config, Manager, and NEW Inference Worker from config import Config from manager import CameraManager from inference import InferenceWorker # --- Logging Setup --- logging.basicConfig( level=Config.LOG_LEVEL, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) app = Flask(__name__) # --- Initialize Components --- camera_manager = CameraManager() inference_worker = InferenceWorker() # <--- NEW inference_worker.start() # <--- Start the background thread # --- MQTT Setup --- mqtt_client = mqtt.Client() if Config.MQTT_USERNAME and Config.MQTT_PASSWORD: mqtt_client.username_pw_set(Config.MQTT_USERNAME, Config.MQTT_PASSWORD) try: mqtt_client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, 60) mqtt_client.loop_start() # START THE LOOP HERE logger.info(f"Connected to MQTT Broker at {Config.MQTT_BROKER}:{Config.MQTT_PORT}") except Exception as e: logger.error(f"Failed to connect to MQTT Broker: {e}") # --- Helper Functions (UI Only) --- def crop_image_for_ui(image, roi_list, scaleX, scaleY): """Helper for the /crop endpoint (UI preview only).""" cropped_images = [] for roi in roi_list: try: x = int(roi['x'] * scaleX) y = int(roi['y'] * scaleY) width = int(roi['width'] * scaleX) height = int(roi['height'] * scaleY) cropped = image[y:y + height, x:x + width] if cropped.size > 0: cropped_images.append(cropped) except Exception: pass return cropped_images def publish_detected_number(camera_id, detected_number): """Publish result to MQTT.""" topic = f"{Config.MQTT_TOPIC}/{camera_id}" payload = json.dumps({"value": detected_number}) try: mqtt_client.publish(topic, payload) logger.info(f"Published to {topic}: {detected_number}") except Exception as e: logger.error(f"MQTT Publish failed: {e}") # --- Main Processing Loop (Refactored) --- # Add this global dictionary at the top of app.py (near other globals) last_processed_time = {} # Update process_all_cameras function def process_all_cameras(): """ Revised Loop with Rate Limiting """ # Configurable interval (seconds) DETECTION_INTERVAL = 10 while True: try: # --- Part 1: Process Results --- while True: result = inference_worker.get_result() if not result: break cam_id = result['camera_id'] val = result['value'] camera_manager.results[cam_id] = val publish_detected_number(cam_id, val) # --- Part 2: Feed Frames --- camera_manager.load_roi_config() for camera_id, camera_data in camera_manager.cameras.items(): if not camera_data.get("active", True): continue # RATE LIMIT CHECK current_time = time.time() last_time = last_processed_time.get(camera_id, 0) if current_time - last_time < DETECTION_INTERVAL: continue # Skip this camera, it's too soon stream = camera_data.get("stream") if not stream: continue # Warmup Check if (current_time - stream.start_time) < 5: continue frame = stream.read() if frame is None: continue if np.std(frame) < 10: continue roi_list = camera_manager.rois.get(camera_id, []) if not roi_list: continue # SEND TO WORKER inference_worker.add_task(camera_id, roi_list, frame) # Update last processed time last_processed_time[camera_id] = current_time # Sleep briefly to prevent CPU spinning, but keep it responsive for results time.sleep(0.1) except Exception as e: logger.error(f"Global process loop error: {e}") traceback.print_exc() time.sleep(5) # --- Flask Routes (Unchanged logic, just imports) --- @app.route('/') def index(): return render_template('index.html') @app.route('/cameras', methods=['GET']) def get_cameras(): return jsonify(camera_manager.get_camera_list()) @app.route('/video/') def video_feed(camera_id): def generate(): while True: frame = camera_manager.get_frame(camera_id) if frame is not None: ret, jpeg = cv2.imencode('.jpg', frame) if ret: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n') else: time.sleep(0.1) return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/snapshot/') def snapshot(camera_id): frame = camera_manager.get_frame(camera_id) if frame is not None: ret, jpeg = cv2.imencode('.jpg', frame) if ret: return Response(jpeg.tobytes(), mimetype='image/jpeg') return 'No frame available', 404 @app.route('/rois/', methods=['GET']) def get_rois(camera_id): # ... (Same logic as Step 3, just ensure it uses camera_manager) ... try: camera_manager.load_roi_config() all_rois = camera_manager.rois img_width = request.args.get("img_width", type=float) img_height = request.args.get("img_height", type=float) if not img_width or not img_height: return jsonify(all_rois.get(camera_id, [])) cam = camera_manager.cameras.get(camera_id) if cam and cam.get("stream"): real_w = cam["stream"].width or cam["width"] real_h = cam["stream"].height or cam["height"] else: return jsonify({"error": "Camera not ready"}), 500 scaleX = img_width / real_w scaleY = img_height / real_h scaled_rois = [] for roi in all_rois.get(camera_id, []): scaled_rois.append({ "id": roi["id"], "x": int(round(roi["x"] * scaleX)), "y": int(round(roi["y"] * scaleY)), "width": int(round(roi["width"] * scaleX)), "height": int(round(roi["height"] * scaleY)), "angle": roi["angle"] }) return jsonify(scaled_rois) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/save_rois", methods=["POST"]) def save_rois_api(): # ... (Same logic as Step 3) ... data = request.json camera_id = data.get("camera_id") new_rois = data.get("rois") img_width = data.get("img_width") img_height = data.get("img_height") if not camera_id or new_rois is None: return jsonify({"success": False}) cam = camera_manager.cameras.get(camera_id) if not cam: return jsonify({"success": False}) stream = cam.get("stream") real_w = stream.width if stream and stream.width else cam["width"] real_h = stream.height if stream and stream.height else cam["height"] scaleX = real_w / img_width if img_width else 1 scaleY = real_h / img_height if img_height else 1 scaled_rois = [] for roi in new_rois: scaled_rois.append({ "id": roi["id"], "x": int(round(roi["x"] * scaleX)), "y": int(round(roi["y"] * scaleY)), "width": int(round(roi["width"] * scaleX)), "height": int(round(roi["height"] * scaleY)), "angle": roi["angle"] }) camera_manager.rois[camera_id] = scaled_rois return jsonify(camera_manager.save_roi_config()) @app.route('/crop', methods=['POST']) def crop(): # Helper for UI data = request.json camera_id = data.get('camera_id') scaleX = data.get('scaleX', 1) scaleY = data.get('scaleY', 1) frame = camera_manager.get_frame(camera_id) if frame is None: return jsonify({'error': 'No frame'}), 500 roi_list = camera_manager.rois.get(camera_id, []) # Use the local UI helper function cropped_images = crop_image_for_ui(frame, roi_list, scaleX, scaleY) cropped_base64_list = [] for cropped_img in cropped_images: ret, buffer = cv2.imencode('.jpg', cropped_img) if ret: cropped_base64_list.append(base64.b64encode(buffer).decode('utf-8')) return jsonify({'cropped_images': cropped_base64_list}) @app.route('/detect_digits', methods=['POST']) def detect_digits(): """Manual trigger: Runs inference immediately and returns result.""" data = request.json camera_id = data.get('camera_id') if not camera_id: return jsonify({'error': 'Invalid camera ID'}), 400 # 1. Get Frame frame = camera_manager.get_frame(camera_id) if frame is None: return jsonify({'error': 'Failed to capture image'}), 500 # 2. Get ROIs roi_list = camera_manager.rois.get(camera_id, []) if not roi_list: return jsonify({'error': 'No ROIs defined'}), 400 # 3. Crop (Using the UI helper is fine here) cropped_images = crop_image_for_ui(frame, roi_list, scaleX=1, scaleY=1) if not cropped_images: return jsonify({'error': 'Failed to crop ROIs'}), 500 # 4. Run Inference Synchronously # Note: We access the worker directly. # Thread safety: 'predict_batch' uses 'self.interpreter'. # If the background thread is also using it, TFLite might complain or crash. # PROPER FIX: Pause the worker or use a Lock. # Since adding a Lock is complex now, a simple hack is to just add it to the queue # and WAIT for the result? No, that's hard to correlate. # SAFE APPROACH: Use a Lock in InferenceWorker. # For now, let's assume TFLite is robust enough or race conditions are rare for manual clicks. # CALL THE PUBLIC METHOD: try: detected_digits = inference_worker.predict_batch(cropped_images) valid_digits = [d for d in detected_digits if d.isdigit()] if not valid_digits: return jsonify({'error': 'No valid digits detected', 'raw': detected_digits}), 500 final_number = int("".join(valid_digits)) # Publish and Update State publish_detected_number(camera_id, final_number) camera_manager.results[camera_id] = final_number logger.info(f"Manual detection for {camera_id}: {final_number}") return jsonify({ 'detected_digits': valid_digits, 'final_number': final_number }) except Exception as e: logger.error(f"Error during manual detection: {e}") return jsonify({'error': str(e)}), 500 @app.route('/update_camera_config', methods=['POST']) def update_camera_config(): data = request.json success = camera_manager.update_camera_flip(data.get("camera_id"), data.get("flip_type")) return jsonify({"success": success}) # --- Main --- if __name__ == '__main__': # Threading: # 1. Video Threads (in Manager) # 2. Inference Thread (in Worker) # 3. Main Loop (process_all_cameras - handles feeding) t = threading.Thread(target=process_all_cameras, daemon=True) t.start() logger.info("Starting Flask Server...") app.run(host='0.0.0.0', port=5000, threaded=True)