forked from streamlit/streamlit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript_request_queue.py
More file actions
146 lines (122 loc) · 5.35 KB
/
script_request_queue.py
File metadata and controls
146 lines (122 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# Copyright 2018-2021 Streamlit Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from collections import deque
from enum import Enum
from typing import Any, Tuple, Deque
from streamlit.proto.ClientState_pb2 import ClientState
from streamlit.widgets import coalesce_widget_states
class ScriptRequest(Enum):
# Stop the script, but don't shutdown the ScriptRunner (data=None)
STOP = "STOP"
# Rerun the script (data=RerunData)
RERUN = "RERUN"
# Shut down the ScriptRunner, stopping any running script first (data=None)
SHUTDOWN = "SHUTDOWN"
class RerunData(object):
"""Data attached to RERUN requests."""
def __init__(self, query_string="", widget_states=None):
self.query_string = query_string
self.widget_states = widget_states
class ScriptRequestQueue(object):
"""A thread-safe queue of ScriptRequests.
ReportSession publishes to this queue, and ScriptRunner consumes from it.
"""
def __init__(self):
self._lock = threading.Lock()
self._queue = deque() # type: Deque[Tuple[ScriptRequest, Any]]
@property
def has_request(self):
"""True if the queue has at least one element"""
with self._lock:
return len(self._queue) > 0
def enqueue(self, request, data=None):
"""Enqueue a new request to the end of the queue.
This request may be coalesced with an existing request if appropriate.
For example, multiple consecutive RERUN requests will be combined
so that there's only ever one pending RERUN request in the queue
at a time.
Parameters
----------
request : ScriptRequest
The type of request
data : Any
Data associated with the request, if any. For example, could be of type RerunData.
"""
with self._lock:
if request == ScriptRequest.SHUTDOWN:
# If we get a shutdown request, it jumps to the front of the
# queue to be processed immediately.
self._queue.appendleft((request, data))
elif request == ScriptRequest.RERUN:
index = _index_if(self._queue, lambda item: item[0] == request)
if index >= 0:
_, old_data = self._queue[index]
if old_data.widget_states is None:
# The existing request's widget_states is None, which
# means it wants to rerun with whatever the most
# recent script execution's widget state was.
# We have no meaningful state to merge with, and
# so we simply overwrite the existing request.
self._queue[index] = (
request,
RerunData(
query_string=data.query_string,
widget_states=data.widget_states,
),
)
elif data.widget_states is None:
# If this request's widget_states is None, and the
# existing request's widget_states was not, this
# new request is entirely redundant and can be dropped.
# TODO: Figure out if this should even happen. This sounds like it should
# raise an exception...
pass
else:
# Both the existing and the new request have
# non-null widget_states. Merge them together.
coalesced_states = coalesce_widget_states(
old_data.widget_states, data.widget_states
)
self._queue[index] = (
request,
RerunData(
query_string=data.query_string,
widget_states=coalesced_states,
),
)
else:
self._queue.append((request, data))
else:
self._queue.append((request, data))
def dequeue(self):
"""Pops the front-most request from the queue and returns it.
Returns (None, None) if the queue is empty.
Returns
-------
A (ScriptRequest, Data) tuple.
"""
with self._lock:
if len(self._queue) > 0:
return self._queue.popleft()
else:
return None, None
def _index_if(collection, pred):
"""Find the index of the first item in a collection for which a predicate is true.
Returns the index, or -1 if no such item exists.
"""
for index, element in enumerate(collection):
if pred(element):
return index
return -1