-
Notifications
You must be signed in to change notification settings - Fork 399
Expand file tree
/
Copy pathwebcam.py
More file actions
executable file
·75 lines (64 loc) · 2.15 KB
/
webcam.py
File metadata and controls
executable file
·75 lines (64 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""TODO: Add docstring."""
import os
import time
import cv2
import numpy as np
import pyarrow as pa
from dora import DoraStatus
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CI = os.environ.get("CI")
font = cv2.FONT_HERSHEY_SIMPLEX
class Operator:
"""Sending image from webcam to the dataflow."""
def __init__(self):
"""TODO: Add docstring."""
self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
self.start_time = time.time()
self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
self.failure_count = 0
def on_event(
self,
dora_event: str,
send_output,
) -> DoraStatus:
"""TODO: Add docstring."""
event_type = dora_event["type"]
if event_type == "INPUT":
ret, frame = self.video_capture.read()
if ret:
frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
self.failure_count = 0
## Push an error image in case the camera is not available.
elif self.failure_count > 10:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
f"No Webcam was found at index {CAMERA_INDEX}",
(30, 30),
font,
0.75,
(255, 255, 255),
2,
1,
)
else:
self.failure_count += 1
return DoraStatus.CONTINUE
send_output(
"image",
pa.array(frame.ravel()),
dora_event["metadata"],
)
elif event_type == "STOP":
print("received stop")
else:
print("received unexpected event:", event_type)
if time.time() - self.start_time < 20 or CI != "true":
return DoraStatus.CONTINUE
return DoraStatus.STOP
def __del__(self):
"""TODO: Add docstring."""
self.video_capture.release()