forked from carbonblack/cbapi-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdevices_query.py
More file actions
executable file
·361 lines (301 loc) · 13.4 KB
/
devices_query.py
File metadata and controls
executable file
·361 lines (301 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
from cbapi.errors import ApiError
from .base_query import PSCQueryBase, QueryBuilder, QueryBuilderSupportMixin, IterableQueryMixin
class DeviceSearchQuery(PSCQueryBase, QueryBuilderSupportMixin, IterableQueryMixin):
"""
Represents a query that is used to locate Device objects.
"""
VALID_OS = ["WINDOWS", "ANDROID", "MAC", "IOS", "LINUX", "OTHER"]
VALID_STATUSES = ["PENDING", "REGISTERED", "UNINSTALLED", "DEREGISTERED",
"ACTIVE", "INACTIVE", "ERROR", "ALL", "BYPASS_ON",
"BYPASS", "QUARANTINE", "SENSOR_OUTOFDATE",
"DELETED", "LIVE"]
VALID_PRIORITIES = ["LOW", "MEDIUM", "HIGH", "MISSION_CRITICAL"]
VALID_DIRECTIONS = ["ASC", "DESC"]
def __init__(self, doc_class, cb):
super().__init__(doc_class, cb)
self._query_builder = QueryBuilder()
self._criteria = {}
self._time_filter = {}
self._exclusions = {}
self._sortcriteria = {}
def _update_criteria(self, key, newlist):
"""
Updates the criteria being collected for a query. Assumes the specified criteria item is
defined as a list; the list passed in will be set as the value for this criteria item, or
appended to the existing one if there is one.
:param str key: The key for the criteria item to be set
:param list newlist: List of values to be set for the criteria item
"""
oldlist = self._criteria.get(key, [])
self._criteria[key] = oldlist + newlist
def _update_exclusions(self, key, newlist):
"""
Updates the exclusion criteria being collected for a query. Assumes the specified criteria item is
defined as a list; the list passed in will be set as the value for this criteria item, or
appended to the existing one if there is one.
:param str key: The key for the criteria item to be set
:param list newlist: List of values to be set for the criteria item
"""
oldlist = self._exclusions.get(key, [])
self._exclusions[key] = oldlist + newlist
def set_ad_group_ids(self, ad_group_ids):
"""
Restricts the devices that this query is performed on to the specified
AD group IDs.
:param ad_group_ids: list of ints
:return: This instance
"""
if not all(isinstance(ad_group_id, int) for ad_group_id in ad_group_ids):
raise ApiError("One or more invalid AD group IDs")
self._update_criteria("ad_group_id", ad_group_ids)
return self
def set_device_ids(self, device_ids):
"""
Restricts the devices that this query is performed on to the specified
device IDs.
:param ad_group_ids: list of ints
:return: This instance
"""
if not all(isinstance(device_id, int) for device_id in device_ids):
raise ApiError("One or more invalid device IDs")
self._update_criteria("id", device_ids)
return self
def set_last_contact_time(self, *args, **kwargs):
"""
Restricts the devices that this query is performed on to the specified
last contact time (either specified as a start and end point or as a
range).
:return: This instance
"""
if kwargs.get("start", None) and kwargs.get("end", None):
if kwargs.get("range", None):
raise ApiError("cannot specify range= in addition to start= and end=")
stime = kwargs["start"]
if not isinstance(stime, str):
stime = stime.isoformat()
etime = kwargs["end"]
if not isinstance(etime, str):
etime = etime.isoformat()
self._time_filter = {"start": stime, "end": etime}
elif kwargs.get("range", None):
if kwargs.get("start", None) or kwargs.get("end", None):
raise ApiError("cannot specify start= or end= in addition to range=")
self._time_filter = {"range": kwargs["range"]}
else:
raise ApiError("must specify either start= and end= or range=")
return self
def set_os(self, operating_systems):
"""
Restricts the devices that this query is performed on to the specified
operating systems.
:param operating_systems: list of operating systems
:return: This instance
"""
if not all((osval in DeviceSearchQuery.VALID_OS) for osval in operating_systems):
raise ApiError("One or more invalid operating systems")
self._update_criteria("os", operating_systems)
return self
def set_policy_ids(self, policy_ids):
"""
Restricts the devices that this query is performed on to the specified
policy IDs.
:param policy_ids: list of ints
:return: This instance
"""
if not all(isinstance(policy_id, int) for policy_id in policy_ids):
raise ApiError("One or more invalid policy IDs")
self._update_criteria("policy_id", policy_ids)
return self
def set_status(self, statuses):
"""
Restricts the devices that this query is performed on to the specified
status values.
:param statuses: list of strings
:return: This instance
"""
if not all((stat in DeviceSearchQuery.VALID_STATUSES) for stat in statuses):
raise ApiError("One or more invalid status values")
self._update_criteria("status", statuses)
return self
def set_target_priorities(self, target_priorities):
"""
Restricts the devices that this query is performed on to the specified
target priority values.
:param target_priorities: list of strings
:return: This instance
"""
if not all((prio in DeviceSearchQuery.VALID_PRIORITIES) for prio in target_priorities):
raise ApiError("One or more invalid target priority values")
self._update_criteria("target_priority", target_priorities)
return self
def set_exclude_sensor_versions(self, sensor_versions):
"""
Restricts the devices that this query is performed on to exclude specified
sensor versions.
:param sensor_versions: List of sensor versions to exclude
:return: This instance
"""
if not all(isinstance(v, str) for v in sensor_versions):
raise ApiError("One or more invalid sensor versions")
self._update_exclusions("sensor_version", sensor_versions)
return self
def sort_by(self, key, direction="ASC"):
"""Sets the sorting behavior on a query's results.
Example::
>>> cb.select(Device).sort_by("name")
:param key: the key in the schema to sort by
:param direction: the sort order, either "ASC" or "DESC"
:rtype: :py:class:`DeviceSearchQuery`
"""
if direction not in DeviceSearchQuery.VALID_DIRECTIONS:
raise ApiError("invalid sort direction specified")
self._sortcriteria = {"field": key, "order": direction}
return self
def _build_request(self, from_row, max_rows):
"""
Creates the request body for an API call.
:param int from_row: The row to start the query at.
:param int max_rows: The maximum number of rows to be returned.
:return: A dict containing the complete request body.
"""
mycrit = self._criteria
if self._time_filter:
mycrit["last_contact_time"] = self._time_filter
request = {"criteria": mycrit, "exclusions": self._exclusions}
request["query"] = self._query_builder._collapse()
if from_row > 0:
request["start"] = from_row
if max_rows >= 0:
request["rows"] = max_rows
if self._sortcriteria != {}:
request["sort"] = [self._sortcriteria]
return request
def _build_url(self, tail_end):
"""
Creates the URL to be used for an API call.
:param str tail_end: String to be appended to the end of the generated URL.
"""
url = self._doc_class.urlobject.format(self._cb.credentials.org_key) + tail_end
return url
def _count(self):
"""
Returns the number of results from the run of this query.
:return: The number of results from the run of this query.
"""
if self._count_valid:
return self._total_results
url = self._build_url("/_search")
request = self._build_request(0, -1)
resp = self._cb.post_object(url, body=request)
result = resp.json()
self._total_results = result["num_found"]
self._count_valid = True
return self._total_results
def _perform_query(self, from_row=0, max_rows=-1):
"""
Performs the query and returns the results of the query in an iterable fashion.
:param int from_row: The row to start the query at (default 0).
:param int max_rows: The maximum number of rows to be returned (default -1, meaning "all").
"""
url = self._build_url("/_search")
current = from_row
numrows = 0
still_querying = True
while still_querying:
request = self._build_request(current, max_rows)
resp = self._cb.post_object(url, body=request)
result = resp.json()
self._total_results = result["num_found"]
self._count_valid = True
results = result.get("results", [])
for item in results:
yield self._doc_class(self._cb, item["id"], item)
current += 1
numrows += 1
if max_rows > 0 and numrows == max_rows:
still_querying = False
break
from_row = current
if current >= self._total_results:
still_querying = False
break
def download(self):
"""
Uses the query parameters that have been set to download all
device listings in CSV format.
Example::
>>> cb.select(Device).status(["ALL"]).download()
:return: The CSV raw data as returned from the server.
"""
tmp = self._criteria.get("status", [])
if not tmp:
raise ApiError("at least one status must be specified to download")
query_params = {"status": ",".join(tmp)}
tmp = self._criteria.get("ad_group_id", [])
if tmp:
query_params["ad_group_id"] = ",".join([str(t) for t in tmp])
tmp = self._criteria.get("policy_id", [])
if tmp:
query_params["policy_id"] = ",".join([str(t) for t in tmp])
tmp = self._criteria.get("target_priority", [])
if tmp:
query_params["target_priority"] = ",".join(tmp)
tmp = self._query_builder._collapse()
if tmp:
query_params["query_string"] = tmp
if self._sortcriteria:
query_params["sort_field"] = self._sortcriteria["field"]
query_params["sort_order"] = self._sortcriteria["order"]
url = self._build_url("/_search/download")
# AGRB 10/3/2019 - Header is TEMPORARY until bug is fixed in API. Remove when fix deployed.
return self._cb.get_raw_data(url, query_params, headers={"Content-Type": "application/json"})
def _bulk_device_action(self, action_type, options=None):
"""
Perform a bulk action on all devices matching the current search criteria.
:param str action_type: The action type to be performed.
:param dict options: Options for the bulk device action. Default None.
"""
request = {"action_type": action_type, "search": self._build_request(0, -1)}
if options:
request["options"] = options
return self._cb._raw_device_action(request)
def background_scan(self, scan):
"""
Set the background scan option for the specified devices.
:param boolean scan: True to turn background scan on, False to turn it off.
"""
return self._bulk_device_action("BACKGROUND_SCAN", self._cb._action_toggle(scan))
def bypass(self, enable):
"""
Set the bypass option for the specified devices.
:param boolean enable: True to enable bypass, False to disable it.
"""
return self._bulk_device_action("BYPASS", self._cb._action_toggle(enable))
def delete_sensor(self):
"""
Delete the specified sensor devices.
"""
return self._bulk_device_action("DELETE_SENSOR")
def uninstall_sensor(self):
"""
Uninstall the specified sensor devices.
"""
return self._bulk_device_action("UNINSTALL_SENSOR")
def quarantine(self, enable):
"""
Set the quarantine option for the specified devices.
:param boolean enable: True to enable quarantine, False to disable it.
"""
return self._bulk_device_action("QUARANTINE", self._cb._action_toggle(enable))
def update_policy(self, policy_id):
"""
Set the current policy for the specified devices.
:param int policy_id: ID of the policy to set for the devices.
"""
return self._bulk_device_action("UPDATE_POLICY", {"policy_id": policy_id})
def update_sensor_version(self, sensor_version):
"""
Update the sensor version for the specified devices.
:param dict sensor_version: New version properties for the sensor.
"""
return self._bulk_device_action("UPDATE_SENSOR_VERSION", {"sensor_version": sensor_version})