forked from carbonblack/cbapi-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
executable file
·371 lines (297 loc) · 14 KB
/
models.py
File metadata and controls
executable file
·371 lines (297 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
from cbapi.models import MutableBaseModel, UnrefreshableModel
from cbapi.errors import ServerError
from cbapi.psc.devices_query import DeviceSearchQuery
from cbapi.psc.alerts_query import BaseAlertSearchQuery, WatchlistAlertSearchQuery, \
CBAnalyticsAlertSearchQuery, VMwareAlertSearchQuery
from copy import deepcopy
import logging
import json
import time
log = logging.getLogger(__name__)
class PSCMutableModel(MutableBaseModel):
_change_object_http_method = "PATCH"
_change_object_key_name = None
def __init__(self, cb, model_unique_id=None, initial_data=None, force_init=False, full_doc=False):
super(PSCMutableModel, self).__init__(cb, model_unique_id=model_unique_id, initial_data=initial_data,
force_init=force_init, full_doc=full_doc)
if not self._change_object_key_name:
self._change_object_key_name = self.primary_key
def _parse(self, obj):
if type(obj) == dict and self.info_key in obj:
return obj[self.info_key]
def _update_object(self):
if self._change_object_http_method != "PATCH":
return self._update_entire_object()
else:
return self._patch_object()
def _update_entire_object(self):
if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
new_object_info = deepcopy(self._info)
try:
if not self._new_object_needs_primary_key:
del(new_object_info[self.__class__.primary_key])
except Exception:
pass
log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
data={self.info_key: new_object_info})
else:
log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
ret = self._cb.api_json_request(self.__class__._change_object_http_method,
self._build_api_request_uri(), data={self.info_key: self._info})
return self._refresh_if_needed(ret)
def _patch_object(self):
if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
data=self._info)
else:
updates = {}
for k in self._dirty_attributes.keys():
updates[k] = self._info[k]
log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
ret = self._cb.api_json_request(self.__class__._change_object_http_method,
self._build_api_request_uri(), data=updates)
return self._refresh_if_needed(ret)
def _refresh_if_needed(self, request_ret):
refresh_required = True
if request_ret.status_code not in range(200, 300):
try:
message = json.loads(request_ret.text)[0]
except Exception:
message = request_ret.text
raise ServerError(request_ret.status_code, message,
result="Did not update {} record.".format(self.__class__.__name__))
else:
try:
message = request_ret.json()
log.debug("Received response: %s" % message)
if not isinstance(message, dict):
raise ServerError(request_ret.status_code, message,
result="Unknown error updating {0:s} record.".format(self.__class__.__name__))
else:
if message.get("success", False):
if isinstance(message.get(self.info_key, None), dict):
self._info = message.get(self.info_key)
self._full_init = True
refresh_required = False
else:
if self._change_object_key_name in message.keys():
# if all we got back was an ID, try refreshing to get the entire record.
log.debug("Only received an ID back from the server, forcing a refresh")
self._info[self.primary_key] = message[self._change_object_key_name]
refresh_required = True
else:
# "success" is False
raise ServerError(request_ret.status_code, message.get("message", ""),
result="Did not update {0:s} record.".format(self.__class__.__name__))
except Exception:
pass
self._dirty_attributes = {}
if refresh_required:
self.refresh()
return self._model_unique_id
class Device(PSCMutableModel):
urlobject = "/appservices/v6/orgs/{0}/devices"
urlobject_single = "/appservices/v6/orgs/{0}/devices/{1}"
primary_key = "id"
swagger_meta_file = "psc/models/device.yaml"
def __init__(self, cb, model_unique_id, initial_data=None):
super(Device, self).__init__(cb, model_unique_id, initial_data)
if model_unique_id is not None and initial_data is None:
self._refresh()
@classmethod
def _query_implementation(cls, cb):
return DeviceSearchQuery(cls, cb)
def _refresh(self):
url = self.urlobject_single.format(self._cb.credentials.org_key, self._model_unique_id)
resp = self._cb.get_object(url)
self._info = resp
self._last_refresh_time = time.time()
return True
def lr_session(self):
"""
Retrieve a Live Response session object for this Device.
:return: Live Response session object
:rtype: :py:class:`cbapi.defense.cblr.LiveResponseSession`
:raises ApiError: if there is an error establishing a Live Response session for this Device
"""
return self._cb._request_lr_session(self._model_unique_id)
def background_scan(self, flag):
"""
Set the background scan option for this device.
:param boolean flag: True to turn background scan on, False to turn it off.
"""
return self._cb.device_background_scan([self._model_unique_id], flag)
def bypass(self, flag):
"""
Set the bypass option for this device.
:param boolean flag: True to enable bypass, False to disable it.
"""
return self._cb.device_bypass([self._model_unique_id], flag)
def delete_sensor(self):
"""
Delete this sensor device.
"""
return self._cb.device_delete_sensor([self._model_unique_id])
def uninstall_sensor(self):
"""
Uninstall this sensor device.
"""
return self._cb.device_uninstall_sensor([self._model_unique_id])
def quarantine(self, flag):
"""
Set the quarantine option for this device.
:param boolean flag: True to enable quarantine, False to disable it.
"""
return self._cb.device_quarantine([self._model_unique_id], flag)
def update_policy(self, policy_id):
"""
Set the current policy for this device.
:param int policy_id: ID of the policy to set for the devices.
"""
return self._cb.device_update_policy([self._model_unique_id], policy_id)
def update_sensor_version(self, sensor_version):
"""
Update the sensor version for this device.
:param dict sensor_version: New version properties for the sensor.
"""
return self._cb.device_update_sensor_version([self._model_unique_id], sensor_version)
class Workflow(UnrefreshableModel):
swagger_meta_file = "psc/models/workflow.yaml"
def __init__(self, cb, initial_data=None):
super(Workflow, self).__init__(cb, model_unique_id=None, initial_data=initial_data)
class BaseAlert(PSCMutableModel):
urlobject = "/appservices/v6/orgs/{0}/alerts"
urlobject_single = "/appservices/v6/orgs/{0}/alerts/{1}"
primary_key = "id"
swagger_meta_file = "psc/models/base_alert.yaml"
def __init__(self, cb, model_unique_id, initial_data=None):
super(BaseAlert, self).__init__(cb, model_unique_id, initial_data)
self._workflow = Workflow(cb, initial_data.get("workflow", None) if initial_data else None)
if model_unique_id is not None and initial_data is None:
self._refresh()
@classmethod
def _query_implementation(cls, cb):
return BaseAlertSearchQuery(cls, cb)
def _refresh(self):
url = self.urlobject_single.format(self._cb.credentials.org_key, self._model_unique_id)
resp = self._cb.get_object(url)
self._info = resp
self._workflow = Workflow(self._cb, resp.get("workflow", None))
self._last_refresh_time = time.time()
return True
@property
def workflow_(self):
return self._workflow
def _update_workflow_status(self, state, remediation, comment):
"""
Update the workflow status of this alert.
:param str state: The state to set for this alert, either "OPEN" or "DISMISSED".
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
request = {"state": state}
if remediation:
request["remediation_state"] = remediation
if comment:
request["comment"] = comment
url = self.urlobject_single.format(self._cb.credentials.org_key,
self._model_unique_id) + "/workflow"
resp = self._cb.post_object(url, request)
self._workflow = Workflow(self._cb, resp.json())
self._last_refresh_time = time.time()
def dismiss(self, remediation=None, comment=None):
"""
Dismiss this alert.
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
self._update_workflow_status("DISMISSED", remediation, comment)
def update(self, remediation=None, comment=None):
"""
Update this alert.
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
self._update_workflow_status("OPEN", remediation, comment)
def _update_threat_workflow_status(self, state, remediation, comment):
"""
Update the workflow status of all alerts with the same threat ID, past or future.
:param str state: The state to set for this alert, either "OPEN" or "DISMISSED".
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
request = {"state": state}
if remediation:
request["remediation_state"] = remediation
if comment:
request["comment"] = comment
url = "/appservices/v6/orgs/{0}/threat/{1}/workflow".format(self._cb.credentials.org_key,
self.threat_id)
resp = self._cb.post_object(url, request)
return Workflow(self._cb, resp.json())
def dismiss_threat(self, remediation=None, comment=None):
"""
Dismiss alerts for this threat.
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
return self._update_threat_workflow_status("DISMISSED", remediation, comment)
def update_threat(self, remediation=None, comment=None):
"""
Update alerts for this threat.
:param remediation str: The remediation status to set for the alert.
:param comment str: The comment to set for the alert.
"""
return self._update_threat_workflow_status("OPEN", remediation, comment)
class WatchlistAlert(BaseAlert):
urlobject = "/appservices/v6/orgs/{0}/alerts/watchlist"
@classmethod
def _query_implementation(cls, cb):
return WatchlistAlertSearchQuery(cls, cb)
class CBAnalyticsAlert(BaseAlert):
urlobject = "/appservices/v6/orgs/{0}/alerts/cbanalytics"
@classmethod
def _query_implementation(cls, cb):
return CBAnalyticsAlertSearchQuery(cls, cb)
class VMwareAlert(BaseAlert):
urlobject = "/appservices/v6/orgs/{0}/alerts/vmware"
@classmethod
def _query_implementation(cls, cb):
return VMwareAlertSearchQuery(cls, cb)
class WorkflowStatus(PSCMutableModel):
urlobject_single = "/appservices/v6/orgs/{0}/workflow/status/{1}"
primary_key = "id"
swagger_meta_file = "psc/models/workflow_status.yaml"
def __init__(self, cb, model_unique_id, initial_data=None):
super(WorkflowStatus, self).__init__(cb, model_unique_id, initial_data)
self._request_id = model_unique_id
self._workflow = None
if model_unique_id is not None:
self._refresh()
def _refresh(self):
url = self.urlobject_single.format(self._cb.credentials.org_key, self._request_id)
resp = self._cb.get_object(url)
self._info = resp
self._workflow = Workflow(self._cb, resp.get("workflow", None))
self._last_refresh_time = time.time()
return True
@property
def id_(self):
return self._request_id
@property
def workflow_(self):
return self._workflow
@property
def queued(self):
self._refresh()
return self._info.get("status", "") == "QUEUED"
@property
def in_progress(self):
self._refresh()
return self._info.get("status", "") == "IN_PROGRESS"
@property
def finished(self):
self._refresh()
return self._info.get("status", "") == "FINISHED"