-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathinfra_diff.py
More file actions
229 lines (197 loc) · 8.08 KB
/
infra_diff.py
File metadata and controls
229 lines (197 loc) · 8.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
from dataclasses import dataclass
from typing import TYPE_CHECKING, Generic, Iterable, List, Optional, Tuple, TypeVar
if TYPE_CHECKING:
from feast.diff.apply_progress import ApplyProgressContext
from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
DATASTORE_INFRA_OBJECT_CLASS_TYPE,
SQLITE_INFRA_OBJECT_CLASS_TYPE,
InfraObject,
)
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
InfraObjectProto = TypeVar("InfraObjectProto", DatastoreTableProto, SqliteTableProto)
@dataclass
class InfraObjectDiff(Generic[InfraObjectProto]):
name: str
infra_object_type: str
current_infra_object: InfraObjectProto
new_infra_object: InfraObjectProto
infra_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType
@dataclass
class InfraDiff:
infra_object_diffs: List[InfraObjectDiff]
def __init__(self):
self.infra_object_diffs = []
def update(self, progress_ctx: Optional["ApplyProgressContext"] = None):
"""Apply the infrastructure changes specified in this object."""
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type in [
TransitionType.DELETE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.current_infra_object
)
if progress_ctx:
progress_ctx.update_phase_progress(
f"Tearing down {infra_object_diff.name}"
)
infra_object.teardown()
elif infra_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.new_infra_object
)
if progress_ctx:
progress_ctx.update_phase_progress(
f"Creating/updating {infra_object_diff.name}"
)
infra_object.update()
def to_string(self):
from colorama import Fore, Style
log_string = ""
message_action_map = {
TransitionType.CREATE: ("Created", Fore.GREEN),
TransitionType.DELETE: ("Deleted", Fore.RED),
TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX),
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
}
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type == TransitionType.UNCHANGED:
continue
action, color = message_action_map[infra_object_diff.transition_type]
log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n"
if infra_object_diff.transition_type == TransitionType.UPDATE:
for _p in infra_object_diff.infra_object_property_diffs:
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"
log_string = (
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure"
if not log_string
else log_string
)
return log_string
def tag_infra_proto_objects_for_keep_delete_add(
existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto]
) -> Tuple[
Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto]
]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}
objs_to_add = [e for e in desired_objs if e.name not in existing_obj_names]
objs_to_keep = [e for e in desired_objs if e.name in existing_obj_names]
objs_to_delete = [e for e in existing_objs if e.name not in desired_obj_names]
return objs_to_keep, objs_to_delete, objs_to_add
def diff_infra_protos(
current_infra_proto: InfraProto,
new_infra_proto: InfraProto,
project: Optional[str] = None,
) -> InfraDiff:
infra_diff = InfraDiff()
infra_object_class_types_to_str = {
DATASTORE_INFRA_OBJECT_CLASS_TYPE: "datastore table",
SQLITE_INFRA_OBJECT_CLASS_TYPE: "sqlite table",
}
for infra_object_class_type in infra_object_class_types_to_str:
current_infra_objects = get_infra_object_protos_by_type(
current_infra_proto, infra_object_class_type
)
new_infra_objects = get_infra_object_protos_by_type(
new_infra_proto, infra_object_class_type
)
# Filter infra objects by project prefix when using shared online stores
# Table names include project prefix: {project}_{table_name}
if project:
project_prefix = f"{project}_"
current_infra_objects = [
obj
for obj in current_infra_objects
if obj.name.startswith(project_prefix)
]
new_infra_objects = [
obj for obj in new_infra_objects if obj.name.startswith(project_prefix)
]
(
infra_objects_to_keep,
infra_objects_to_delete,
infra_objects_to_add,
) = tag_infra_proto_objects_for_keep_delete_add(
current_infra_objects,
new_infra_objects,
)
for e in infra_objects_to_add:
infra_diff.infra_object_diffs.append(
InfraObjectDiff(
e.name,
infra_object_class_types_to_str[infra_object_class_type],
None,
e,
[],
TransitionType.CREATE,
)
)
for e in infra_objects_to_delete:
infra_diff.infra_object_diffs.append(
InfraObjectDiff(
e.name,
infra_object_class_types_to_str[infra_object_class_type],
e,
None,
[],
TransitionType.DELETE,
)
)
for e in infra_objects_to_keep:
current_infra_object = [
_e for _e in current_infra_objects if _e.name == e.name
][0]
infra_diff.infra_object_diffs.append(
diff_between(
current_infra_object,
e,
infra_object_class_types_to_str[infra_object_class_type],
)
)
return infra_diff
def get_infra_object_protos_by_type(
infra_proto: InfraProto, infra_object_class_type: str
) -> List[InfraObjectProto]:
return [
InfraObject.from_infra_object_proto(infra_object).to_proto()
for infra_object in infra_proto.infra_objects
if infra_object.infra_object_class_type == infra_object_class_type
]
FIELDS_TO_IGNORE = {"project"}
def diff_between(
current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str
) -> InfraObjectDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
if current != new:
for _field in current.DESCRIPTOR.fields:
if _field.name in FIELDS_TO_IGNORE:
continue
if getattr(current, _field.name) != getattr(new, _field.name):
transition = TransitionType.UPDATE
property_diffs.append(
PropertyDiff(
_field.name,
getattr(current, _field.name),
getattr(new, _field.name),
)
)
return InfraObjectDiff(
new.name,
infra_object_type,
current,
new,
property_diffs,
transition,
)