|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import csv |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +from ns_vfs.common.utility import save_frames |
| 7 | +from ns_vfs.config.loader import load_config |
| 8 | +from ns_vfs.data.frame import BenchmarkLTLFrame, FramesofInterest |
| 9 | +from ns_vfs.frame_searcher import FrameSearcher |
| 10 | +from ns_vfs.model.vision.grounding_dino import GroundingDino |
| 11 | +from ns_vfs.processor.benchmark_video_processor import BenchmarkVideoFrameProcessor |
| 12 | +from ns_vfs.video_to_automaton import VideotoAutomaton |
| 13 | + |
| 14 | + |
| 15 | +def get_frames(frames_of_interest: list, benchmark_video: BenchmarkLTLFrame): |
| 16 | + output = [] |
| 17 | + for frame_set in frames_of_interest: |
| 18 | + if len(frame_set) == 1: |
| 19 | + idx = frame_set[0] |
| 20 | + output.append([benchmark_video.images_of_frames[idx]]) |
| 21 | + else: |
| 22 | + idx_first, idx_last = frame_set[0], frame_set[-1] |
| 23 | + tmp_list = [] |
| 24 | + for idx in range(idx_first, idx_last + 1): |
| 25 | + tmp_list.append(benchmark_video.images_of_frames[idx]) |
| 26 | + output.append(tmp_list) |
| 27 | + tmp_list = [] |
| 28 | + return output |
| 29 | + |
| 30 | + |
| 31 | +def evaluate_frame_of_interest( |
| 32 | + benchmark_video_file: str, |
| 33 | + benchmark_video: BenchmarkLTLFrame, |
| 34 | + frame_of_interest: FramesofInterest, |
| 35 | + directory_path: str, |
| 36 | +): |
| 37 | + result = dict() |
| 38 | + dir_path = Path(directory_path) / benchmark_video_file.name.split(".pkl")[0] |
| 39 | + dir_path.mkdir(parents=True, exist_ok=True) |
| 40 | + |
| 41 | + true_foi_list = benchmark_video.frames_of_interest |
| 42 | + total_num_true_foi = len(true_foi_list) |
| 43 | + |
| 44 | + num_of_mathching_frame_set = sum(1 for a, b in zip(true_foi_list, frame_of_interest.foi_list) if a == b) |
| 45 | + frame_set_accuracy = num_of_mathching_frame_set / total_num_true_foi |
| 46 | + |
| 47 | + # matching_accuracy |
| 48 | + flattened_true_foi = set([item for sublist in true_foi_list for item in sublist]) |
| 49 | + flattened_predicted_foi = set([item for sublist in frame_of_interest.foi_list for item in sublist]) |
| 50 | + true_positive_set = flattened_true_foi.intersection(flattened_predicted_foi) |
| 51 | + false_positive_set = flattened_predicted_foi.difference(flattened_true_foi) |
| 52 | + false_negatives = flattened_true_foi.difference(flattened_predicted_foi) |
| 53 | + |
| 54 | + # filename = benchmark_video_file.name.split("_ltl_")[-1].split("_")[0] |
| 55 | + dir_path / benchmark_video_file.name.split(".pkl")[0] / ".json" |
| 56 | + |
| 57 | + result["ltl_formula"] = benchmark_video.ltl_formula |
| 58 | + result["total_number_of_frame"] = len(benchmark_video.labels_of_frames) |
| 59 | + result["exact_frame_accuracy"] = frame_set_accuracy |
| 60 | + result["num_true_positive"] = len(true_positive_set) |
| 61 | + result["num_false_positive"] = len(false_positive_set) |
| 62 | + result["precision"] = len(true_positive_set) / len(true_positive_set) + len(false_positive_set) |
| 63 | + result["recall"] = len(true_positive_set) / len(true_positive_set) + len(false_negatives) |
| 64 | + |
| 65 | + result["groud_truth_frame"] = benchmark_video.frames_of_interest |
| 66 | + result["predicted_frame"] = frame_of_interest.foi_list |
| 67 | + |
| 68 | + result["total_number_of_framer_of_interest"] = len(benchmark_video.frames_of_interest) |
| 69 | + result["total_number_of_frame"] = len(benchmark_video.labels_of_frames) |
| 70 | + |
| 71 | + i = 0 |
| 72 | + for frame_image_set in get_frames(frame_of_interest.foi_list, benchmark_video): |
| 73 | + path = Path(directory_path) / benchmark_video_file.name.split(".pkl")[0] / f"video_frame_{i}" |
| 74 | + save_frames(frames=frame_image_set, path=path, file_label="predicted_frame") |
| 75 | + i += 1 |
| 76 | + |
| 77 | + # save_dict_to_pickle( |
| 78 | + # path=Path(directory_path) / benchmark_video_file.name.split(".pkl")[0], |
| 79 | + # dict_obj=result, |
| 80 | + # file_name="result.pkl", |
| 81 | + # ) |
| 82 | + # Specifying the file name |
| 83 | + csv_file_name = Path(directory_path) / "data.csv" |
| 84 | + |
| 85 | + with open(csv_file_name, mode="a", newline="") as file: |
| 86 | + writer = csv.DictWriter(file, fieldnames=result.keys()) |
| 87 | + |
| 88 | + if not csv_file_name.exists(): |
| 89 | + # If file does not exist, write header |
| 90 | + writer.writeheader() |
| 91 | + writer.writerow(result) |
| 92 | + |
| 93 | + acc_file = Path(directory_path) / "accuracy.txt" |
| 94 | + with acc_file.open("a") as f: |
| 95 | + f.write( |
| 96 | + f"""{result["ltl_formula"]} - total num frame: {result["total_number_of_frame"]} - exact_frame_accuracy: {result["exact_frame_accuracy"]} |
| 97 | + num_true_positive: {result["num_true_positive"]}, num_false_positive: {result["num_false_positive"]} , |
| 98 | + precision: {result["precision"]} recall: {result["recall"]}\n""" |
| 99 | + ) |
| 100 | + |
| 101 | + |
| 102 | +def get_available_benchmark_video(path_to_directory: str): |
| 103 | + if isinstance(path_to_directory, str): |
| 104 | + directory_path = Path(path_to_directory) |
| 105 | + return list(directory_path.glob("*.pkl")) |
| 106 | + else: |
| 107 | + directory_path = path_to_directory |
| 108 | + return list(directory_path.rglob("*.pkl")) |
| 109 | + |
| 110 | + |
| 111 | +if __name__ == "__main__": |
| 112 | + config = load_config() |
| 113 | + benchmark_frame_video_root_dir = Path( |
| 114 | + "/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/benchmark_frame_video/" |
| 115 | + ) |
| 116 | + |
| 117 | + benchmark_image_set_dir = [x for x in benchmark_frame_video_root_dir.iterdir() if x.is_dir()] |
| 118 | + |
| 119 | + for benchmark_name_dir in benchmark_image_set_dir: |
| 120 | + ltl_video_dir_set = [x for x in benchmark_name_dir.iterdir() if x.is_dir()] |
| 121 | + if len(ltl_video_dir_set) > 0: |
| 122 | + print(f"--processing {benchmark_name_dir.name}--") |
| 123 | + print(f"number of ltl rule: {len(ltl_video_dir_set)}") |
| 124 | + for ltl_video_dir in ltl_video_dir_set: |
| 125 | + benchmark_video_file_list = get_available_benchmark_video(ltl_video_dir) |
| 126 | + print(f"number of examples of {ltl_video_dir.name}: {len(benchmark_video_file_list)}") |
| 127 | + |
| 128 | + for benchmark_video_file in benchmark_video_file_list: |
| 129 | + benchmark_video_processor = BenchmarkVideoFrameProcessor( |
| 130 | + video_path=benchmark_video_file, |
| 131 | + artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH, |
| 132 | + manual_confidence_probability=1.0, |
| 133 | + ) |
| 134 | + |
| 135 | + benchmark_img_frame: BenchmarkLTLFrame = benchmark_video_processor.benchmark_image_frames |
| 136 | + |
| 137 | + video_automata_builder = VideotoAutomaton( |
| 138 | + detector=GroundingDino( |
| 139 | + config=config.GROUNDING_DINO, |
| 140 | + weight_path=config.GROUNDING_DINO.GROUNDING_DINO_CHECKPOINT_PATH, |
| 141 | + config_path=config.GROUNDING_DINO.GROUNDING_DINO_CONFIG_PATH, |
| 142 | + ), |
| 143 | + video_processor=benchmark_video_processor, |
| 144 | + artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH, |
| 145 | + proposition_set=benchmark_img_frame.proposition, |
| 146 | + save_annotation=False, # TODO: Debug only |
| 147 | + save_image=False, # TODO: Debug only |
| 148 | + ltl_formula=f"P>=0.80 [{benchmark_img_frame.ltl_formula}]", |
| 149 | + verbose=False, |
| 150 | + manual_confidence_probability=1.0, |
| 151 | + ) |
| 152 | + frame_sercher = FrameSearcher( |
| 153 | + video_automata_builder=video_automata_builder, |
| 154 | + video_processor=benchmark_video_processor, |
| 155 | + ) |
| 156 | + |
| 157 | + frame_of_interest = frame_sercher.search() |
| 158 | + |
| 159 | + evaluate_frame_of_interest( |
| 160 | + benchmark_video_file=benchmark_video_file, |
| 161 | + benchmark_video=benchmark_img_frame, |
| 162 | + frame_of_interest=frame_of_interest, |
| 163 | + directory_path="/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/benchmark_eval_results", |
| 164 | + ) |
0 commit comments