from typing import Dict, List, Optional, Union
import sys
import json
import contextlib
import io
import traceback
import uuid
import ast
STDIN = sys.stdin
STDOUT = sys.stdout
STDERR = sys.stderr
USER_GLOBALS = {}
def send_message(msg: str):
length_msg = len(msg)
STDOUT.buffer.write(f"Content-Length: {length_msg}\r\n\r\n{msg}".encode(encoding="utf-8"))
STDOUT.buffer.flush()
def print_log(msg: str):
send_message(json.dumps({"jsonrpc": "2.0", "method": "log", "params": msg}))
def send_response(response: str, response_id: int):
send_message(json.dumps({"jsonrpc": "2.0", "id": response_id, "result": response}))
def send_request(params: Optional[Union[List, Dict]] = None):
request_id = uuid.uuid4().hex
if params is None:
send_message(json.dumps({"jsonrpc": "2.0", "id": request_id, "method": "input"}))
else:
send_message(
json.dumps({"jsonrpc": "2.0", "id": request_id, "method": "input", "params": params})
)
return request_id
original_input = input
def custom_input(prompt=""):
try:
send_request({"prompt": prompt})
headers = get_headers()
content_length = int(headers.get("Content-Length", 0))
if content_length:
message_text = STDIN.read(content_length)
message_json = json.loads(message_text)
our_user_input = message_json["result"]["userInput"]
return our_user_input
except Exception:
print_log(traceback.format_exc())
# Set input to our custom input
USER_GLOBALS["input"] = custom_input
input = custom_input
def handle_response(request_id):
while not STDIN.closed:
try:
headers = get_headers()
content_length = int(headers.get("Content-Length", 0))
if content_length:
message_text = STDIN.read(content_length)
message_json = json.loads(message_text)
our_user_input = message_json["result"]["userInput"]
if message_json["id"] == request_id:
send_response(our_user_input, message_json["id"])
elif message_json["method"] == "exit":
sys.exit(0)
except Exception:
print_log(traceback.format_exc())
def exec_function(user_input):
try:
compile(user_input, "