import cv2
import threading
import time
import os
import multiprocessing
import json
from datetime import datetime
[docs]
class VideoRecorder:
"""
A class to handle video recording, frame capturing, and live feed display.
Attributes:
recording (bool): True if recording is active.
show_feed (bool): True if live camera feed is being displayed.
video_writer (cv2.VideoWriter): OpenCV video writer object.
capture (cv2.VideoCapture): OpenCV camera capture object.
output_filename (str): Filename for saved video.
frame_timestamps (list): Timestamps for each recorded frame.
"""
def __init__(self):
self.recording = False
self.show_feed = False
self.video_writer = None
self.capture = None
self.output_filename = None
self.frame_timestamps = []
self.lock = threading.Lock()
self.capture_thread = None
self.display_thread = None
[docs]
def setup_camera(self):
"""
Initialize the camera.
Returns:
bool: True if camera initialized successfully, False otherwise.
"""
try:
self.capture = cv2.VideoCapture(0)
if not self.capture.isOpened():
print("Camera Program p1: ERROR: Could not open camera!")
return False
max_fps = self.capture.get(cv2.CAP_PROP_FPS)
print(f"Camera Program p1: Maximum FPS supported by camera: {max_fps}")
self.capture.set(cv2.CAP_PROP_FPS, max_fps)
ret, _ = self.capture.read()
if not ret:
print("Camera Program p1: ERROR: Could not read frame from camera!")
self.capture.release()
return False
print("Camera Program p1: Camera initialized successfully")
return True
except Exception as e:
print(f"Camera Program p1: ERROR: Camera setup failed: {e}")
if self.capture is not None:
self.capture.release()
return False
def _capture_frames(self):
"""
Internal method to continuously capture frames from the camera.
Handles recording frames and displaying live feed.
"""
try:
while self.recording or self.show_feed:
ret, frame = self.capture.read()
if not ret:
print("Camera Program p1: ERROR: Failed to read frame from camera")
break
with self.lock:
if self.recording and self.video_writer is not None:
self.frame_timestamps.append(datetime.now().timestamp())
self.video_writer.write(frame)
if self.show_feed:
cv2.imshow("Camera Feed", frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or cv2.getWindowProperty("Camera Feed", cv2.WND_PROP_VISIBLE) < 1:
self.show_feed = False
cv2.destroyAllWindows()
except Exception as e:
print(f"Camera Program p1: ERROR in frame capture: {e}")
finally:
print("Camera Program p1: Frame capture stopped")
[docs]
def start_recording(self):
"""
Start recording video.
Saves video to a timestamped file and stores frame timestamps.
"""
with self.lock:
if self.recording:
print("Camera Program p1: Already recording")
return
try:
ret, frame = self.capture.read()
if not ret:
print("Camera Program p1: ERROR: Couldn't read frame to start recording")
return
timestamp = time.strftime('%Y%m%d_%H%M%S')
self.output_filename = f"data/recording_start_time_{timestamp}.avi"
self.frame_timestamps = [datetime.now().timestamp()]
fourcc = cv2.VideoWriter_fourcc(*'XVID')
h, w = frame.shape[:2]
max_fps = self.capture.get(cv2.CAP_PROP_FPS)
self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, max_fps, (w, h))
if not self.video_writer.isOpened():
print(f"Camera Program p1: ERROR: Failed to open video writer for file: {self.output_filename}")
return
self.video_writer.write(frame)
self.recording = True
# Start capture thread if not already running
if not self.show_feed and (self.capture_thread is None or not self.capture_thread.is_alive()):
self.capture_thread = threading.Thread(target=self._capture_frames)
self.capture_thread.start()
print(f"Camera Program p1: Recording started - saving to {self.output_filename} at {max_fps} FPS")
except Exception as e:
print(f"Camera Program p1: ERROR starting recording: {e}")
if self.video_writer is not None:
self.video_writer.release()
self.video_writer = None
[docs]
def stop_recording(self):
"""
Stop video recording and save frame timestamps to a JSON file.
Releases the video writer object safely.
"""
with self.lock:
if not self.recording:
print("Camera Program p1: Not recording")
return
try:
if self.video_writer is not None:
self.video_writer.release()
self.video_writer = None
json_filename = self.output_filename.replace('.avi', '_timestamps.json')
with open(json_filename, 'w') as jsonfile:
json_data = [{
'frame_number': i,
'timestamp': ts,
'datetime': datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')
} for i, ts in enumerate(self.frame_timestamps)]
json.dump(json_data, jsonfile, indent=4)
print(f"Camera Program p1: Recording stopped and saved to {self.output_filename}")
print(f"Camera Program p1: Timestamps saved to {json_filename}")
if os.path.exists(self.output_filename):
size = os.path.getsize(self.output_filename)
print(f"Camera Program p1: File size: {size / 1024:.2f} KB")
else:
print("Camera Program p1: WARNING: Output file not found!")
except Exception as e:
print(f"Camera Program p1: ERROR stopping recording: {e}")
finally:
self.recording = False
[docs]
def display_feed(self):
"""
Start displaying the live camera feed in a separate window.
"""
with self.lock:
if self.show_feed:
return
self.show_feed = True
# Start capture thread if not already running
if self.capture_thread is None or not self.capture_thread.is_alive():
self.capture_thread = threading.Thread(target=self._capture_frames)
self.capture_thread.start()
print("Camera Program p1: Camera feed started")
[docs]
def close_feed(self):
"""
Close the live camera feed display and stop the display thread if not recording.
"""
with self.lock:
self.show_feed = False
time.sleep(0.1) # Allow time for display loop to exit
cv2.destroyAllWindows()
# Stop capture thread if not recording
if not self.recording and self.capture_thread is not None:
self.capture_thread.join(timeout=1)
print("Camera Program p1: Camera feed closed")
[docs]
def video_clean_up(self):
"""
Clean up all resources including stopping recording, releasing camera,
and closing any open windows.
"""
with self.lock:
self.show_feed = False
if self.recording:
self.stop_recording()
if self.capture is not None:
self.capture.release()
self.capture = None
cv2.destroyAllWindows()
self.frame_timestamps = []
if self.capture_thread is not None:
self.capture_thread.join(timeout=1)
print("Camera Program p1: All resources cleaned up")
[docs]
def handle_command(recorder, command, conn):
"""
Handle a command string received from the parent program.
Args:
recorder (VideoRecorder): Instance of VideoRecorder to execute commands.
command (str): Command string.
conn (multiprocessing.Connection): Pipe connection to send back responses.
"""
command = command.strip().lower()
print(f"Camera Program p1: Received command: {command}")
if command == "show_feed":
recorder.display_feed()
conn.send("Camera Feed started")
elif command == "close_feed":
recorder.close_feed()
conn.send("Camera Feed closed")
elif command == "start_recording":
recorder.start_recording()
conn.send("Recording started")
elif command == "stop_recording":
recorder.stop_recording()
conn.send("Recording stopped")
elif command == "video_clean_up":
recorder.video_clean_up()
conn.send("Cleaned up video data")
elif command == "initialize_camera":
recorder.setup_camera()
conn.send("Initialized camera")
else:
conn.send(f"Unknown command: {command}")
[docs]
def command_listener(recorder, conn):
"""
Continuously listen for commands from the parent program and execute them.
Args:
recorder (VideoRecorder): Instance to execute commands.
conn (multiprocessing.Connection): Pipe connection to communicate with parent.
"""
try:
print("Camera Program p1: Command listener started. Waiting for commands...")
while True:
if conn.poll():
command = conn.recv()
if command == "exit":
print("Camera Program p1: Received exit command. Shutting down...")
recorder.video_clean_up()
print("Camera Program p1: Program exited")
break
if command:
handle_command(recorder, command, conn)
except Exception as e:
print(f"Camera Program p1: Command listener error: {e}")
finally:
conn.close()
print("Camera Program p1: Command listener stopped")
[docs]
def main(conn):
"""
Main entry point for the video program.
Args:
conn (multiprocessing.Connection): Pipe connection to receive commands.
"""
print("Camera Program p1: Video Program starting...")
recorder = VideoRecorder()
try:
listener_thread = threading.Thread(target=command_listener, args=(recorder, conn))
listener_thread.daemon = True
listener_thread.start()
print("Camera Program p1: Video Program ready. Run Parent Program to send commands.")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Camera Program p1: Program interrupted by user")
finally:
recorder.video_clean_up()
print("Camera Program p1: Program exited")
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
main(child_conn)