import cv2 import threading import time import logging import os logger = logging.getLogger(__name__) class VideoStream: def __init__(self, camera_id, rtsp_url, flip_type="None"): self.camera_id = camera_id self.rtsp_url = rtsp_url self.flip_type = flip_type self.width = None self.height = None # State self.active = False self.frame = None self.lock = threading.Lock() self.cap = None # Track start time for warmup logic (Step 2 Fix) self.start_time = time.time() # Initialize connection self.connect() def connect(self): """Establish the cv2 connection with TCP forced.""" if self.cap: self.cap.release() logger.info(f"Connecting to {self.camera_id} via RTSP (TCP)...") # Fix for packet errors (Step 3 Fix) os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if self.cap.isOpened(): ret, frame = self.cap.read() if ret: h, w, _ = frame.shape self.width = w self.height = h self.frame = frame self.start_time = time.time() # Reset warm-up timer logger.info(f"Connected to {self.camera_id} ({w}x{h})") else: logger.warning(f"Connected to {self.camera_id} but initial read failed.") self.cap.release() else: logger.error(f"Failed to open RTSP stream for {self.camera_id}") def start(self): """Start the thread to read frames.""" if self.active: return self self.active = True self.thread = threading.Thread(target=self.update, args=(), daemon=True) self.thread.start() return self def update(self): """Background loop to read frames.""" failure_count = 0 while self.active: if self.cap is None or not self.cap.isOpened(): time.sleep(2) self.connect() continue try: ret, frame = self.cap.read() except Exception as e: logger.error(f"Read error: {e}") ret = False if ret: # Apply flip if self.flip_type == "horizontal": frame = cv2.flip(frame, 1) elif self.flip_type == "vertical": frame = cv2.flip(frame, 0) elif self.flip_type == "both": frame = cv2.flip(frame, -1) with self.lock: self.frame = frame failure_count = 0 time.sleep(0.01) else: failure_count += 1 if failure_count > 5: logger.warning(f"Stream {self.camera_id} lost. Reconnecting...") self.connect() failure_count = 0 else: time.sleep(0.1) def clear_buffer(self): """Flush the buffer to remove old frames. (Step 2 Fix)""" if self.cap and self.cap.isOpened(): self.cap.grab() def read(self): """Return the most recent frame.""" with self.lock: return self.frame def stop(self): """Stop the thread and release resources.""" self.active = False if self.thread.is_alive(): self.thread.join() if self.cap: self.cap.release() def set_flip(self, flip_type): self.flip_type = flip_type