-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_test.py
147 lines (123 loc) · 5.07 KB
/
run_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import build
import csv
import datetime
import math
import setup
import subprocess
import sys
import time
import os
from concurrent.futures import as_completed
from concurrent.futures import ProcessPoolExecutor
def run_test_case(case_str, input_file, output_file, solution_file, vis_file, score_txt, is_interactive, params=None, cleanup=False):
if params:
cmd_cpp = [solution_file] + [str(item) for pair in params.items() for item in pair]
else:
cmd_cpp = [solution_file]
start_time = time.perf_counter() # 実行前の時刻を取得
subprocess.run(
cmd_cpp,
stdin=open(input_file, "r"),
stdout=open(output_file, "w"),
stderr=subprocess.DEVNULL, # stderrは不要なら破棄
text=True
)
end_time = time.perf_counter() # 実行後の時刻を取得
elapsed_time = end_time - start_time # 経過時間を計算
elapsed_time_ms = elapsed_time * 1000 # ミリ秒に変換
res = subprocess.run(
[vis_file, input_file, output_file],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True
)
score = -1
for line in res.stdout.splitlines():
line = line.strip()
if line.startswith(score_txt):
score = int(line.split("=")[1].strip())
break
# 一時ファイルの削除
if cleanup and os.path.isfile(output_file):
try:
os.remove(output_file)
except OSError as e:
print(f"Warning: failed to remove {output_file}: {e}", file=sys.stderr)
return {
"case": case_str,
"score": score,
"elapsed_time": elapsed_time_ms,
}
def main():
# コンパイル
config = setup.load_config()
build.compile_program(config)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
relative_work_dir = config["paths"]["relative_work_dir"]
work_dir = os.path.abspath(os.path.join(SCRIPT_DIR, relative_work_dir))
input_dir = os.path.join(work_dir, config["test"]["input_dir"])
output_dir = os.path.join(work_dir, config["test"]["output_dir"])
solution_file = os.path.join(work_dir, config["files"]["sol_file"])
vis_file = os.path.join(work_dir, config["files"]["vis_file"])
score_txt = config["test"]["tester_output_score_txt"]
is_interactive = config["problem"]["interactive"]
os.makedirs(output_dir, exist_ok=True)
# テストケースの実行結果
testcase_count = config["problem"]["pretest_count"]
wrong_answer_count = 0
results = []
with ProcessPoolExecutor(max_workers=None) as executor:
futures = []
for i in range(testcase_count):
case_str = f"{i:04d}"
input_file = os.path.join(input_dir, case_str + ".txt")
output_file = os.path.join(output_dir, case_str + ".txt")
if not os.path.exists(input_file):
print(f"Error: {input_file} was not found.")
continue
# テストケースの実行
futures.append(executor.submit(run_test_case, case_str, input_file, output_file, solution_file, vis_file, score_txt, is_interactive))
for future in as_completed(futures):
result = future.result()
score = result['score']
if score <= 0:
wrong_answer_count += 1
print(f"Error: {result['case']} failed to get score.")
continue
results.append(result)
print(f"seed:{result['case']} score:{result['score']:,d} ({result['elapsed_time']:.2f} ms)")
if len(results) == 0:
print("No results to display.")
exit(0)
# スコアの合計を計算
total_score = sum(result['score'] for result in results)
# スコアのlogの合計を計算
log_score = sum(math.log(result['score']) for result in results)
ave_log_score = log_score / len(results)
# 最大実行時間を取得
max_time_result = max(results, key=lambda r: r['elapsed_time'])
max_time = max_time_result['elapsed_time']
max_time_case = max_time_result['case']
print(f"----- All test cases finished (total {testcase_count}) -----")
print(f"Wrong Answers: {wrong_answer_count} / {testcase_count}")
print(f"Maximum Execution Time: {max_time:.2f} ms (case: {max_time_case})")
print(f"Total Score: {total_score:,d}")
print(f"Average Log Score: {ave_log_score:.2f}")
return {
"wrong_answers": wrong_answer_count,
"total_score": total_score,
"ave_log_score": ave_log_score,
}
if __name__ == "__main__":
res = main()
wa_count = res["wrong_answers"]
total_score = res["total_score"]
ave_log_score = res["ave_log_score"]
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file = 'summary.tsv'
write_header = not os.path.exists(file)
with open(file, "a", newline="") as f:
writer = csv.writer(f, delimiter="\t")
if write_header:
writer.writerow(["Timestamp", "Wrong Answers", "Total Score", "Average Log Score"])
writer.writerow([now_time, wa_count, total_score, f"{ave_log_score:.2f}"])