-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathpolicy.py
More file actions
129 lines (98 loc) · 3.8 KB
/
policy.py
File metadata and controls
129 lines (98 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from abc import ABC, abstractmethod
from typing import Any
from feast.permissions.user import User
from feast.protos.feast.core.Policy_pb2 import Policy as PolicyProto
from feast.protos.feast.core.Policy_pb2 import RoleBasedPolicy as RoleBasedPolicyProto
class Policy(ABC):
"""
An abstract class to ensure that the current user matches the configured security policies.
"""
@abstractmethod
def validate_user(self, user: User) -> tuple[bool, str]:
"""
Validate the given user against the configured policy.
Args:
user: The current user.
Returns:
bool: `True` if the user matches the policy criteria, `False` otherwise.
str: A possibly empty explanation of the reason for not matching the configured policy.
"""
raise NotImplementedError
@staticmethod
def from_proto(policy_proto: PolicyProto) -> Any:
"""
Converts policy config in protobuf spec to a Policy class object.
Args:
policy_proto: A protobuf representation of a Policy.
Returns:
A Policy class object.
"""
policy_type = policy_proto.WhichOneof("policy_type")
if policy_type == "role_based_policy":
return RoleBasedPolicy.from_proto(policy_proto)
if policy_type is None:
return None
raise NotImplementedError(f"policy_type is unsupported: {policy_type}")
@abstractmethod
def to_proto(self) -> PolicyProto:
"""
Converts a PolicyProto object to its protobuf representation.
"""
raise NotImplementedError
class RoleBasedPolicy(Policy):
"""
A `Policy` implementation where the user roles must be enforced to grant access to the requested action.
At least one of the configured roles must be granted to the current user in order to allow the execution of the secured operation.
E.g., if the policy enforces roles `a` and `b`, the user must have at least one of them in order to satisfy the policy.
"""
def __init__(
self,
roles: list[str],
):
self.roles = roles
def __eq__(self, other):
if not isinstance(other, RoleBasedPolicy):
raise TypeError(
"Comparisons should only involve RoleBasedPolicy class objects."
)
if sorted(self.roles) != sorted(other.roles):
return False
return True
def get_roles(self) -> list[str]:
return self.roles
def validate_user(self, user: User) -> tuple[bool, str]:
"""
Validate the given `user` against the configured roles.
"""
result = user.has_matching_role(self.roles)
explain = "" if result else f"Requires roles {self.roles}"
return (result, explain)
@staticmethod
def from_proto(policy_proto: PolicyProto) -> Any:
"""
Converts policy config in protobuf spec to a Policy class object.
Args:
policy_proto: A protobuf representation of a Policy.
Returns:
A RoleBasedPolicy class object.
"""
return RoleBasedPolicy(roles=list(policy_proto.role_based_policy.roles))
def to_proto(self) -> PolicyProto:
"""
Converts a PolicyProto object to its protobuf representation.
"""
role_based_policy_proto = RoleBasedPolicyProto(roles=self.roles)
policy_proto = PolicyProto(role_based_policy=role_based_policy_proto)
return policy_proto
def allow_all(self, user: User) -> tuple[bool, str]:
return True, ""
def empty_policy(self) -> PolicyProto:
return PolicyProto()
"""
A `Policy` instance to allow execution of any action to each user
"""
AllowAll = type(
"AllowAll",
(Policy,),
{Policy.validate_user.__name__: allow_all, Policy.to_proto.__name__: empty_policy},
)()