3
3
import datetime
4
4
import logging
5
5
import multiprocessing
6
- import random
7
- import string
8
6
import threading
9
7
from typing import Any , Dict
10
8
11
9
from swatch .config import CameraConfig , SwatchConfig
12
10
from swatch .image import ImageProcessor
13
11
from swatch .models import Detection
14
12
from swatch .snapshot import SnapshotProcessor
13
+ from swatch .util import get_random_suffix
15
14
16
15
17
16
logger = logging .getLogger (__name__ )
@@ -77,7 +76,7 @@ def __handle_detections__(self, detection_result: Dict[str, Any]) -> None:
77
76
self .obj_data [non_unique_id ] = {}
78
77
79
78
unique_id = (
80
- f"{ non_unique_id } .{ '' . join ( random . choices ( string . ascii_lowercase + string . digits , k = 6 ) )} "
79
+ f"{ non_unique_id } .{ get_random_suffix ( )} "
81
80
if not self .obj_data [non_unique_id ].get ("id" )
82
81
else self .obj_data [non_unique_id ]["id" ]
83
82
)
@@ -86,21 +85,22 @@ def __handle_detections__(self, detection_result: Dict[str, Any]) -> None:
86
85
self .obj_data [non_unique_id ]["zone_name" ] = zone_name
87
86
self .obj_data [non_unique_id ]["variant" ] = object_result ["variant" ]
88
87
89
- top_area = max ([d ["area" ] for d in object_result ["objects" ]])
90
- best_box = next (
91
- d for d in object_result ["objects" ] if d ["area" ] == top_area
92
- )["box" ]
88
+ if object_result .get ("objects" ):
89
+ top_area = max ([d ["area" ] for d in object_result ["objects" ]])
90
+ best_box = next (
91
+ d for d in object_result ["objects" ] if d ["area" ] == top_area
92
+ )["box" ]
93
93
94
- if top_area > self .obj_data [non_unique_id ].get ("top_area" , 0 ):
95
- self .obj_data [non_unique_id ]["top_area" ] = top_area
94
+ if top_area > self .obj_data [non_unique_id ].get ("top_area" , 0 ):
95
+ self .obj_data [non_unique_id ]["top_area" ] = top_area
96
96
97
- # save snapshot with best area
98
- self .snap_processor .save_detection_snapshot (
99
- cam_name ,
100
- zone_name ,
101
- unique_id ,
102
- best_box ,
103
- )
97
+ # save snapshot with best area
98
+ self .snap_processor .save_detection_snapshot (
99
+ cam_name ,
100
+ zone_name ,
101
+ unique_id ,
102
+ best_box ,
103
+ )
104
104
105
105
if not self .obj_data [non_unique_id ].get ("id" ):
106
106
self .obj_data [non_unique_id ]["id" ] = unique_id
@@ -113,6 +113,7 @@ def __handle_detections__(self, detection_result: Dict[str, Any]) -> None:
113
113
del self .obj_data [non_unique_id ]
114
114
115
115
def run (self ) -> None :
116
+ # pylint: disable=singleton-comparison
116
117
logger .info ("Starting Auto Detection for %s" , self .config .name )
117
118
118
119
while not self .stop_event .wait (self .config .auto_detect ):
@@ -123,7 +124,7 @@ def run(self) -> None:
123
124
124
125
# ensure db doesn't contain bad data after shutdown
125
126
Detection .update (end_time = datetime .datetime .now ().timestamp ()).where (
126
- Detection .end_time is None
127
+ Detection .end_time == None
127
128
).execute ()
128
129
logger .info ("Stopping Auto Detection for %s" , self .config .name )
129
130
0 commit comments