forked from MicroPythonOS/MicroPythonOS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.py
More file actions
44 lines (38 loc) · 1.28 KB
/
queue.py
File metadata and controls
44 lines (38 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
try:
import _thread
except ImportError:
_thread = None
class Queue:
def __init__(self, maxsize=0):
self._queue = []
self.maxsize = maxsize # 0 means unlimited
self._lock = _thread.allocate_lock() if _thread else None
def put(self, item):
if self._lock:
with self._lock:
if self.maxsize > 0 and len(self._queue) >= self.maxsize:
raise RuntimeError("Queue is full")
self._queue.append(item)
else:
if self.maxsize > 0 and len(self._queue) >= self.maxsize:
raise RuntimeError("Queue is full")
self._queue.append(item)
def get(self):
if self._lock:
with self._lock:
if not self._queue:
raise RuntimeError("Queue is empty")
return self._queue.pop(0)
else:
if not self._queue:
raise RuntimeError("Queue is empty")
return self._queue.pop(0)
def qsize(self):
if self._lock:
with self._lock:
return len(self._queue)
return len(self._queue)
def empty(self):
return self.qsize() == 0
def full(self):
return self.maxsize > 0 and self.qsize() >= self.maxsize