-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathutils.py
More file actions
258 lines (202 loc) · 8.39 KB
/
utils.py
File metadata and controls
258 lines (202 loc) · 8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import asyncio
import hashlib
import json
import os
import time
from pathlib import Path
from typing import Any, Iterable, Literal, Mapping, Sequence, TypeVar
import openai
import tenacity
import tiktoken
N_CORES = 1 if (count := os.cpu_count()) is None or count == 0 else count // 2
def read_jsonl(path: str | Path) -> list[Any]:
"""Read lines of JSON from a file (including '\n')."""
with Path(path).open("r") as f:
return [json.loads(line) for line in f]
def write_jsonl(path: str | Path, data: Sequence[Mapping], mode: str = "w"):
# cannot use `dict` here as it is invariant
with Path(path).open(mode) as f:
for item in data:
f.write(json.dumps(item) + "\n")
_T = TypeVar("_T")
def chunked(seq: Sequence[_T], n: int) -> Iterable[Sequence[_T]]:
"""Yield successive n-sized chunks from seq."""
return (seq[i : i + n] for i in range(0, len(seq), n))
def retry(errors: Any, max_attempts: int = 5):
return tenacity.retry(
retry=tenacity.retry_if_exception_type(errors),
wait=tenacity.wait_exponential(multiplier=1, min=5, max=20),
stop=tenacity.stop_after_attempt(max_attempts),
before_sleep=print,
)
ERRORS = (
openai.RateLimitError,
openai.APIError,
openai.APIConnectionError,
openai.InternalServerError,
)
class OpenAIClient:
def __init__(self):
self.client = openai.OpenAI()
self.async_client = openai.AsyncClient()
@retry(ERRORS)
def chat_completions_with_backoff(self, *args, **kwargs):
return self.client.chat.completions.create(*args, **kwargs)
@retry(ERRORS)
def completions_with_backoff(self, *args, **kwargs):
return self.client.completions.create(*args, **kwargs)
@retry(ERRORS)
async def chat_completions_with_backoff_async(self, *args, **kwargs):
return await self.async_client.chat.completions.create(*args, **kwargs)
@retry(ERRORS)
async def completions_with_backoff_async(self, *args, **kwargs):
return await self.async_client.completions.create(*args, **kwargs)
async def delayed_request(
self,
request: dict[str, Any],
mode: Literal["chat", "completion"],
delay: float | None,
):
"""Prevent quantized rate limit:
https://help.openai.com/en/articles/6891753-rate-limit-advice"""
if delay is not None:
# synchronized sleep
time.sleep(delay)
if mode == "chat":
func = self.chat_completions_with_backoff_async
else:
func = self.completions_with_backoff_async
return await func(**request)
async def dispatch_chat_completions(
self,
requests: list[dict[str, Any]],
delay: float | None = None,
):
"""Dispatch chat completions requests asynchronously.
Args:
requests: a list of API argument names to values.
delay: interval between requests.
"""
tasks = [self.delayed_request(request, "chat", delay) for request in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
async def dispatch_completions(
self,
requests: list[dict[str, Any]],
delay: float | None = None,
):
"""Dispatch completions requests asynchronously.
Args:
requests: a list of API argument names to values.
delay: interval between requests.
"""
tasks = [
self.delayed_request(request, "completion", delay) for request in requests
]
return await asyncio.gather(*tasks, return_exceptions=True)
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
def num_tokens_from_string(string: str, model: str) -> int:
"""Returns the number of tokens in a text string."""
encoding = tiktoken.encoding_for_model(model)
# encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string, disallowed_special=()))
return num_tokens
def timestamp() -> str:
return time.strftime("%Y%m%d_%H%M%S")
def compute_fingerprint(*args: Any, hash_length: int | None = None) -> str:
combined = "".join(map(str, args))
content = hashlib.sha256(combined.encode()).hexdigest()
if hash_length is not None:
content = content[:hash_length]
return content
def find_code_blocks(response: str, tag: str | None = None) -> list[str]:
"""Find all enclosed code blocks in the response, optionally filtering by language tag."""
all_indices = find_codeblock_indices(response, tag)
return [response[start:end].strip() for start, end in all_indices]
def find_codeblock_indices(
response: str, tag: str | None = None
) -> list[tuple[int, int]]:
"""Find all enclosed code blocks in the response, optionally filtering by language tag."""
all_indices: list[tuple[int, int]] = []
search_start = (
0 # Variable to keep track of where to start searching for the next code block
)
while "```" in response[search_start:]:
# Find the start of the code block (excluding the backticks)
code_start_index = response.find("```", search_start) + 3
# Find the end of the language tag line (or the start of the code if no tag line)
code_start_endline = response.find("\n", code_start_index)
if code_start_endline == -1: # Handle case where there's no newline after ```
code_start_endline = code_start_index
# Extract the language tag (if any)
extracted_tag = response[code_start_index:code_start_endline].strip()
# Adjust the start index if a language tag is found
if extracted_tag:
actual_code_start = code_start_endline + 1
else:
actual_code_start = code_start_index
# Find the end of the code block
code_end_index = response.find("```", actual_code_start)
if code_end_index == -1:
break # Exit if there's no closing ```
# Extract the code
# code = response[actual_code_start:code_end_index].strip()
# Check if the extracted code block matches the requested language tag (if any)
if tag is None or extracted_tag.lower() == tag.lower():
all_indices.append((actual_code_start, code_end_index))
# Update the search_start to look for the next code block
search_start = code_end_index + 3
return all_indices
def remove_comments_from_code_blocks(
content: str,
) -> str:
code_blocks = find_codeblock_indices(content)
# Current index in the original content for tracking purposes
current_index = 0
# Buffer to store the new content
new_content: list[str] = []
# Iterate over each code block
for start, end in code_blocks:
# Append the content before this code block
new_content.append(content[current_index:start])
# Extract the code block content
code_block_content = content[start:end]
# Split into lines, process, and rejoin
lines = code_block_content.splitlines(keepends=True)
kept_lines = list[str]()
i = 0
while i < len(lines):
if (
i != 0
and i + 1 < len(lines)
and lines[i].strip() == ""
and lines[i + 1].lstrip().startswith("#")
):
i += 2
continue
if lines[i].lstrip().startswith("#"):
i += 1
continue
kept_lines.append(lines[i])
i += 1
# Join the processed lines and add to the modified blocks list
modified_block_content = "".join(kept_lines)
new_content.append(modified_block_content)
# Update current index
current_index = end
# Add the remaining part of the original content after the last code block
new_content.append(content[current_index:])
# Join all parts to form the final modified content
return "".join(new_content)
def infer_prompt_template(tokenizer_name: str) -> str:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
template = tokenizer.apply_chat_template(
[
{"role": "user", "content": "{instruction}"},
{"role": "assistant", "content": "{response}"},
],
tokenize=False,
)
end_index = template.rindex("{response}") + len("{response}")
template = template[:end_index]
return template