-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathexecution_filter.py
More file actions
210 lines (176 loc) · 6.55 KB
/
execution_filter.py
File metadata and controls
210 lines (176 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import ast
import json
import os
import re
import shutil
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Process, cpu_count
from evalplus.eval.utils import (
create_tempdir,
reliability_guard,
swallow_io,
time_limit,
)
from fire import Fire
from tqdm.auto import tqdm
from star_align.utils import chunked
# dependencies: evalplus fire tqdm
def suppress_output(func):
def wrapper(*args, **kwargs):
original_stdout = sys.stdout
original_stderr = sys.stderr
sys.stdout = open(os.devnull, "w")
sys.stderr = sys.stdout
try:
result = func(*args, **kwargs)
finally:
sys.stdout.close()
sys.stdout = original_stdout
sys.stderr = original_stderr
return result
return wrapper
# Note: only run this within a safe subprocess
def _run(code) -> None:
with create_tempdir():
# These system calls are needed when cleaning up tempdir.
rmtree = shutil.rmtree
rmdir = os.rmdir
chdir = os.chdir
getcwd = os.getcwd
maximum_memory_bytes = 1 * 1024 * 1024 * 1024
reliability_guard(maximum_memory_bytes=maximum_memory_bytes)
# Disable functionalities that can make destructive changes to the test.
# allow only 1GB memory usage
# run the function
with swallow_io():
with time_limit(4): # max 4 seconds
# run the function
exec(code)
# Needed for cleaning up.
shutil.rmtree = rmtree
os.rmdir = rmdir
os.chdir = chdir
os.getcwd = getcwd
# from code_exec_server.code_exec_reqs import exec_test
def containerized_run(item):
from code_exec_server.code_exec_reqs import exec_test
idx, result, code, srv = item
passed, _ = exec_test(srv, code, "", timeout=10)
return (idx, result) if passed else None
def fork_run(item):
idx, result, code, _ = item
sys.stdout = open(os.devnull, "w")
sys.stderr = sys.stdout
p = Process(target=_run, args=(code,))
p.start()
p.join(timeout=10)
return (idx, result) if p.exitcode == 0 else None
def is_compilable(code):
try:
ast.parse(code)
return True
except SyntaxError:
return False
def extract_code(response):
pattern = r"^```python\s*\n(.*?)(?=^```)"
result = re.findall(pattern, response, re.DOTALL | re.MULTILINE)
return "\n".join([x for x in result if is_compilable(x)])
# /scratch/sc2-instruct/data-ossx-del2-fewshot-mpt-response-temp0-s_i-1shot-temp0-i_r-8754b-0-20240321_172151.jsonl
def main(
response_path: str,
result_path: str,
max_workers: int = cpu_count(),
cache_path: str | None = None,
container_server=None,
):
# load jsonl
with open(response_path, "r") as f:
raw_data = [json.loads(line) for line in f if line.strip()]
if cache_path is not None:
with open(cache_path, "r") as f:
cached_data = [json.loads(line) for line in f if line.strip()]
# instruction -> set[response]
hit_code = set[str]()
for item in tqdm(cached_data):
code = extract_code(item["response"])
hit_code.add(code)
uncompilable = 0
all_tasks = []
print("Container server:", container_server)
for idx, item in enumerate(tqdm(raw_data)):
# passing_results = []
if "parsing_result" not in item:
code = extract_code(item["response"])
if not code:
uncompilable += 1
continue
all_tasks.append((idx, item, code, container_server))
else:
for result in item["parsing_result"]:
code = extract_code(result["response"])
if not code:
uncompilable += 1
continue
all_tasks.append((idx, result, code, container_server))
# passing_results.append((result, code))
# Split cached/un-cached data
active_tasks = []
cached_tasks = []
for task in tqdm(all_tasks):
_, _, code, _ = task
if cache_path is not None and code in hit_code:
cached_tasks.append(task)
else:
active_tasks.append(task)
with open(result_path, "w") as f:
for idx, result, _, _ in cached_tasks:
newdata = {
k: v
for k, v in raw_data[idx].items()
if k not in ["response", "parsing_result"]
}
newdata["response"] = result["response"]
f.write(json.dumps(newdata) + "\n")
print(f"Active tasks: {len(active_tasks)}")
print(f"Cached tasks: {len(cached_tasks)}")
run_func = containerized_run if container_server else fork_run
nfails = 0
tasks_chunks = chunked(active_tasks, os.cpu_count())
with open(result_path, "a") as f:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
for chunked_tasks in tqdm(tasks_chunks):
futures = [executor.submit(run_func, task) for task in chunked_tasks]
# for idx, presults in tqdm(tasks):
# futures = [
# executor.submit(fork_run, (i, pres))
# for i, pres in enumerate(presults)
# ]
# passed_indices = []
# NOTE: futures do not return in the same order as before
for future in tqdm(as_completed(futures), total=len(futures)):
try:
future_result = future.result()
if future_result is None:
nfails += 1
continue
idx, result = future_result
newdata = {
k: v
for k, v in raw_data[idx].items()
if k not in ["response", "parsing_result"]
}
newdata["response"] = result["response"]
f.write(json.dumps(newdata) + "\n")
except Exception:
nfails += 1
continue
# if passed_indices:
# item = data[idx]
# item["parsing_result"] = [presults[i] for i in passed_indices]
# f.write(json.dumps(item) + "\n")
print(f"Uncompilable: {uncompilable}")
print(f"Failed: {nfails}")
if __name__ == "__main__":
print("Try to run this file using docker if possible!")
Fire(main)