forked from pikasTech/PikaPython
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsocket_thread.py
More file actions
63 lines (52 loc) · 1.58 KB
/
socket_thread.py
File metadata and controls
63 lines (52 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import socket
import _thread
import random
import time
test_finished = False
server_started = False
def socket_server_task(host, port):
"""
socket 服务器任务
:return:
"""
print("socket server start:", host, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(5)
print("socket server waiting accept")
global server_started
server_started = True
accept, addr = s.accept()
print("socket server accepted at", addr)
while True:
try:
data = accept.recv(1024)
print('socket server recv:', data.decode())
accept.send(data)
except Exception:
print('socket server closing accept')
accept.close()
break
print("socket server closing")
s.close()
global test_finished
test_finished = True
def socket_server_init(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_server_task, (host, port))
def socket_client_task(host, port):
print("socket client start:", host, port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
client.send("hello".encode())
recv = client.recv(1024).decode()
print("client recv:", recv)
client.close()
def socket_server_test(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_client_task, (host, port))
test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
time.sleep(0.1)