-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathmatcher.py
More file actions
149 lines (119 loc) · 4.89 KB
/
matcher.py
File metadata and controls
149 lines (119 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
This module provides utility matching functions.
"""
import logging
import re
from typing import TYPE_CHECKING, Any, Optional
from unittest.mock import Mock
from feast.permissions.action import AuthzedAction
if TYPE_CHECKING:
from feast.feast_object import FeastObject
logger = logging.getLogger(__name__)
def is_a_feast_object(resource: Any):
"""
A matcher to verify that a given object is one of the Feast objects defined in the `FeastObject` type.
Args:
resource: An object instance to verify.
Returns:
`True` if the given object is one of the types in the FeastObject alias or a subclass of one of them.
"""
from feast.feast_object import ALL_RESOURCE_TYPES
for t in ALL_RESOURCE_TYPES:
# Use isinstance to pass Mock validation
if isinstance(resource, t):
return True
return False
def _get_type(resource: "FeastObject") -> Any:
is_mock = isinstance(resource, Mock)
if not is_mock:
return type(resource)
else:
return getattr(resource, "_spec_class", None)
def resource_match_config(
resource: "FeastObject",
expected_types: list["FeastObject"],
name_patterns: list[str],
required_tags: Optional[dict[str, str]] = None,
) -> bool:
"""
Match a given Feast object against the configured type, name and tags in a permission configuration.
Args:
resource: A FeastObject instance to match agains the permission.
expected_types: The list of object types configured in the permission. Type match also includes all the sub-classes.
name_patterns: The possibly empty list of name pattern filters configured in the permission.
required_tags: The optional dictionary of required tags configured in the permission.
Returns:
bool: `True` if the resource matches the configured permission filters.
"""
if resource is None:
logger.warning(f"None passed to {resource_match_config.__name__}")
return False
_type = _get_type(resource)
if not is_a_feast_object(resource):
logger.warning(f"Given resource is not of a managed type but {_type}")
return False
# mypy check ignored because of https://github.com/python/mypy/issues/11673, or it raises "Argument 2 to "isinstance" has incompatible type "tuple[Featu ..."
if not isinstance(resource, tuple(expected_types)): # type: ignore
logger.info(
f"Resource does not match any of the expected type {expected_types}"
)
return False
if not _resource_name_matches_name_patterns(resource, name_patterns):
return False
if required_tags:
if hasattr(resource, "required_tags"):
if isinstance(resource.required_tags, dict):
for tag in required_tags.keys():
required_value = required_tags.get(tag)
actual_value = resource.required_tags.get(tag)
if required_value != actual_value:
logger.info(
f"Unmatched value {actual_value} for required tag {tag}: expected {required_value}"
)
return False
else:
logger.warning(
f"Resource {resource} has no `required_tags` attribute of unexpected type {type(resource.required_tags)}"
)
else:
logger.warning(f"Resource {resource} has no `required_tags` attribute")
return True
def _resource_name_matches_name_patterns(
resource: "FeastObject",
name_patterns: list[str],
) -> bool:
if not hasattr(resource, "name"):
logger.warning(f"Resource {resource} has no `name` attribute")
return True
if not name_patterns:
return True
if resource.name is None:
return True
if not isinstance(resource.name, str):
logger.warning(
f"Resource {resource} has `name` attribute of unexpected type {type(resource.name)}"
)
return True
for name_pattern in name_patterns:
match = bool(re.fullmatch(name_pattern, resource.name))
if not match:
logger.info(
f"Resource name {resource.name} does not match pattern {name_pattern}"
)
else:
logger.info(f"Resource name {resource.name} matched pattern {name_pattern}")
return True
return False
def actions_match_config(
requested_actions: list[AuthzedAction],
allowed_actions: list[AuthzedAction],
) -> bool:
"""
Match a list of actions against the actions defined in a permission configuration.
Args:
requested_actions: A list of actions to be executed.
allowed_actions: The list of actions configured in the permission.
Returns:
bool: `True` if all the given `requested_actions` are defined in the `allowed_actions`.
"""
return all(a in allowed_actions for a in requested_actions)