forked from carbonblack/cbapi-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
83 lines (62 loc) · 3.04 KB
/
auth.py
File metadata and controls
83 lines (62 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from cbapi.six.moves.configparser import RawConfigParser
import os
import attrdict
import cbapi.six as six
from .errors import CredentialError
default_profile = {
"url": None,
"token": None,
"ssl_verify": "True",
"ssl_verify_hostname": "True",
"ssl_cert_file": None,
"ssl_force_tls_1_2": "False",
"proxy": None,
"ignore_system_proxy": "False",
"rabbitmq_user": "cb",
"rabbitmq_pass": None,
"rabbitmq_host": "localhost",
"rabbitmq_port": 5004
}
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
class Credentials(attrdict.AttrDict):
def __init__(self, *args, **kwargs):
super(Credentials, self).__init__(default_profile)
super(Credentials, self).__init__(*args, **kwargs)
if not self.get("url", None):
raise CredentialError("No URL specified")
if not self.get("token", None):
raise CredentialError("No API token specified")
for k in ["ssl_verify", "ssl_verify_hostname", "ignore_system_proxy", "ssl_force_tls_1_2"]:
x = self.get(k, default_profile.get(k, "True"))
if isinstance(x, six.string_types) and x.lower() in _boolean_states:
self[k] = _boolean_states[x.lower()]
class CredentialStore(object):
def __init__(self, product_name, **kwargs):
if product_name not in ("response", "protection", "defense"):
raise CredentialError("Product name {0:s} not valid")
self.credential_search_path = [
os.path.join(os.path.sep, "etc", "carbonblack", "credentials.%s" % product_name),
os.path.join(os.path.expanduser("~"), ".carbonblack", "credentials.%s" % product_name),
os.path.join(".", ".carbonblack", "credentials.%s" % product_name),
]
if "credential_file" in kwargs:
if isinstance(kwargs["credential_file"], six.string_types):
self.credential_search_path = [kwargs["credential_file"]]
elif type(kwargs["credential_file"]) is list:
self.credential_search_path = kwargs["credential_file"]
self.credentials = RawConfigParser(defaults=default_profile)
self.credential_files = self.credentials.read(self.credential_search_path)
def get_credentials(self, profile=None):
credential_profile = profile or "default"
if credential_profile not in self.get_profiles():
raise CredentialError("Cannot find credential profile '%s' after searching in these files: %s." %
(credential_profile, ", ".join(self.credential_search_path)))
retval = {}
for k, v in six.iteritems(default_profile):
retval[k] = self.credentials.get(credential_profile, k)
if not retval["url"] or not retval["token"]:
raise CredentialError("Token and/or URL not available for profile %s" % credential_profile)
return Credentials(retval)
def get_profiles(self):
return self.credentials.sections()