208 lines
7.6 KiB
Python
208 lines
7.6 KiB
Python
import threading
|
|
import queue
|
|
import time
|
|
import logging
|
|
import cv2
|
|
import numpy as np
|
|
import tflite_runtime.interpreter as tflite
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class InferenceWorker:
|
|
def __init__(self):
|
|
self.input_queue = queue.Queue(maxsize=10)
|
|
self.result_queue = queue.Queue()
|
|
self.running = False
|
|
self.interpreter = None
|
|
self.input_details = None
|
|
self.output_details = None
|
|
self.lock = threading.Lock()
|
|
|
|
# Validation thresholds
|
|
self.CONFIDENCE_THRESHOLD = 0.80 # Minimum confidence (0-1) to accept a digit
|
|
self.MIN_VALUE = 5 # Minimum allowed temperature value
|
|
self.MAX_VALUE = 100 # Maximum allowed temperature value
|
|
|
|
# Load Model
|
|
self.load_model()
|
|
|
|
def load_model(self):
|
|
try:
|
|
logger.info(f"Loading TFLite model from: {Config.MODEL_PATH}")
|
|
self.interpreter = tflite.Interpreter(model_path=Config.MODEL_PATH)
|
|
self.interpreter.allocate_tensors()
|
|
self.input_details = self.interpreter.get_input_details()
|
|
self.output_details = self.interpreter.get_output_details()
|
|
|
|
# Store original input shape for resizing logic
|
|
self.original_input_shape = self.input_details[0]['shape']
|
|
logger.info(f"Model loaded. Default input shape: {self.original_input_shape}")
|
|
except Exception as e:
|
|
logger.critical(f"Failed to load TFLite model: {e}")
|
|
self.interpreter = None
|
|
|
|
def start(self):
|
|
if self.running: return
|
|
self.running = True
|
|
threading.Thread(target=self._worker_loop, daemon=True).start()
|
|
logger.info("Inference worker started.")
|
|
|
|
def add_task(self, camera_id, rois, frame):
|
|
"""Add task (non-blocking)."""
|
|
if not self.interpreter: return
|
|
try:
|
|
task = {
|
|
'camera_id': camera_id,
|
|
'rois': rois,
|
|
'frame': frame,
|
|
'timestamp': time.time()
|
|
}
|
|
self.input_queue.put(task, block=False)
|
|
except queue.Full:
|
|
pass
|
|
|
|
def get_result(self):
|
|
try:
|
|
return self.result_queue.get(block=False)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def _worker_loop(self):
|
|
while self.running:
|
|
try:
|
|
task = self.input_queue.get(timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
cam_id = task['camera_id']
|
|
rois = task['rois']
|
|
frame = task['frame']
|
|
|
|
try:
|
|
# 1. Crop all ROIs
|
|
crops = self._crop_rois(frame, rois)
|
|
if not crops: continue
|
|
|
|
# 2. Batch Predict (Returns dicts with 'digit' and 'confidence')
|
|
predictions = self.predict_batch(crops)
|
|
|
|
# 3. Validation Logic
|
|
valid_digits_str = []
|
|
confidences = []
|
|
|
|
# Check individual digit confidence
|
|
all_confident = True
|
|
for p in predictions:
|
|
if p['confidence'] < self.CONFIDENCE_THRESHOLD:
|
|
logger.warning(f"[{cam_id}] Rejected digit '{p['digit']}' due to low confidence: {p['confidence']:.2f}")
|
|
all_confident = False
|
|
break
|
|
valid_digits_str.append(p['digit'])
|
|
confidences.append(p['confidence'])
|
|
|
|
if not all_confident:
|
|
continue # Skip this frame entirely if any digit is uncertain
|
|
|
|
if not valid_digits_str:
|
|
continue
|
|
|
|
# Parse number
|
|
try:
|
|
final_number_str = "".join(valid_digits_str)
|
|
final_number = int(final_number_str)
|
|
|
|
# Check Range
|
|
if self.MIN_VALUE <= final_number <= self.MAX_VALUE:
|
|
avg_conf = float(np.mean(confidences))
|
|
self.result_queue.put({
|
|
'camera_id': cam_id,
|
|
'value': final_number,
|
|
'digits': valid_digits_str,
|
|
'confidence': avg_conf
|
|
})
|
|
logger.info(f"[{cam_id}] Valid reading: {final_number} (Avg Conf: {avg_conf:.2f})")
|
|
else:
|
|
logger.warning(f"[{cam_id}] Value {final_number} out of range ({self.MIN_VALUE}-{self.MAX_VALUE}). Ignored.")
|
|
|
|
except ValueError:
|
|
logger.warning(f"[{cam_id}] Could not parse digits into integer: {valid_digits_str}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Inference error for {cam_id}: {e}")
|
|
|
|
def _crop_rois(self, image, roi_list):
|
|
cropped_images = []
|
|
for roi in roi_list:
|
|
try:
|
|
x, y, w, h = roi['x'], roi['y'], roi['width'], roi['height']
|
|
cropped = image[y:y+h, x:x+w]
|
|
if cropped.size > 0:
|
|
cropped_images.append(cropped)
|
|
except Exception:
|
|
pass
|
|
return cropped_images
|
|
|
|
def predict_batch(self, images):
|
|
"""Run inference on a batch of images at once. Returns list of dicts: {'digit': str, 'confidence': float}"""
|
|
with self.lock:
|
|
if not self.interpreter: return []
|
|
|
|
num_images = len(images)
|
|
if num_images == 0: return []
|
|
|
|
input_index = self.input_details[0]['index']
|
|
output_index = self.output_details[0]['index']
|
|
|
|
# Preprocess all images into a single batch array
|
|
# Shape: [N, 32, 20, 3] (assuming model expects 32x20 rgb)
|
|
batch_input = []
|
|
target_h, target_w = 32, 20 # Based on your previous code logic
|
|
|
|
for img in images:
|
|
# Resize
|
|
roi_resized = cv2.resize(img, (target_w, target_h))
|
|
# Color
|
|
roi_rgb = cv2.cvtColor(roi_resized, cv2.COLOR_BGR2RGB)
|
|
# Normalize
|
|
roi_norm = roi_rgb.astype(np.float32)
|
|
batch_input.append(roi_norm)
|
|
|
|
# Create batch tensor
|
|
input_tensor = np.array(batch_input)
|
|
|
|
# --- DYNAMIC RESIZING ---
|
|
# TFLite models have a fixed input size (usually batch=1).
|
|
# We must resize the input tensor to match our current batch size (N).
|
|
|
|
# 1. Resize input tensor
|
|
self.interpreter.resize_tensor_input(input_index, [num_images, target_h, target_w, 3])
|
|
|
|
# 2. Re-allocate tensors
|
|
self.interpreter.allocate_tensors()
|
|
|
|
# 3. Run Inference
|
|
self.interpreter.set_tensor(input_index, input_tensor)
|
|
self.interpreter.invoke()
|
|
|
|
# 4. Get Results
|
|
output_data = self.interpreter.get_tensor(output_index)
|
|
# Result shape is [N, 10] (logits or probabilities for 10 digits)
|
|
|
|
results = []
|
|
for i in range(num_images):
|
|
# Calculate softmax to get probabilities (if model output is logits)
|
|
# If model output is already softmax, this is redundant but usually harmless if sum is approx 1
|
|
logits = output_data[i]
|
|
probs = np.exp(logits) / np.sum(np.exp(logits))
|
|
|
|
digit_class = np.argmax(probs)
|
|
confidence = probs[digit_class]
|
|
|
|
results.append({
|
|
'digit': str(digit_class),
|
|
'confidence': float(confidence)
|
|
})
|
|
|
|
return results
|