-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathbase_feature_view.py
More file actions
250 lines (212 loc) · 8.87 KB
/
base_feature_view.py
File metadata and controls
250 lines (212 loc) · 8.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, List, Optional, Type, Union
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from feast.data_source import DataSource
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.StreamFeatureView_pb2 import (
StreamFeatureView as StreamFeatureViewProto,
)
class BaseFeatureView(ABC):
"""
A BaseFeatureView defines a logical group of features.
Attributes:
name: The unique name of the base feature view.
features: The list of features defined as part of this base feature view.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the base feature view, typically the email of the primary
maintainer.
projection: The feature view projection storing modifications to be applied to
this base feature view at retrieval time.
created_timestamp: The time when the base feature view was created.
last_updated_timestamp: The time when the base feature view was last
updated.
"""
name: str
features: List[Field]
description: str
tags: Dict[str, str]
owner: str
projection: FeatureViewProjection
created_timestamp: Optional[datetime]
last_updated_timestamp: Optional[datetime]
@abstractmethod
def __init__(
self,
*,
name: str,
features: Optional[List[Field]] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
source: Optional[DataSource] = None,
):
"""
Creates a BaseFeatureView object.
Args:
name: The unique name of the base feature view.
features (optional): The list of features defined as part of this base feature view.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the base feature view, typically the email of the
primary maintainer.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
"""
assert name is not None
self.name = name
self.features = features or []
self.description = description
self.tags = tags or {}
self.owner = owner
self.projection = FeatureViewProjection.from_definition(self)
self.created_timestamp = None
self.last_updated_timestamp = None
if source:
self.source = source
@property
@abstractmethod
def proto_class(self) -> Type[Message]:
pass
@abstractmethod
def to_proto(
self,
) -> Union[FeatureViewProto, OnDemandFeatureViewProto, StreamFeatureViewProto]:
pass
@classmethod
@abstractmethod
def from_proto(cls, feature_view_proto):
pass
@abstractmethod
def __copy__(self):
"""Returns a deep copy of this base feature view."""
pass
def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
def __str__(self):
return str(MessageToJson(self.to_proto()))
def __hash__(self):
return hash(self.name)
def __getitem__(self, item):
assert isinstance(item, list)
cp = self.__copy__()
if self.features:
feature_name_to_feature = {
feature.name: feature for feature in self.features
}
referenced_features = []
for feature in item:
if feature not in feature_name_to_feature:
raise ValueError(
f"Feature {feature} does not exist in this feature view."
)
referenced_features.append(feature_name_to_feature[feature])
cp.projection.features = referenced_features
else:
cp.projection.desired_features = item
return cp
def __eq__(self, other):
if not isinstance(other, BaseFeatureView):
raise TypeError(
"Comparisons should only involve BaseFeatureView class objects."
)
if (
self.name != other.name
or sorted(self.features) != sorted(other.features)
or self.projection != other.projection
or self.description != other.description
or self.tags != other.tags
or self.owner != other.owner
):
# This is meant to ignore the File Source change to Push Source
if isinstance(type(self.source), type(other.source)):
if self.source != other.source:
return False
return False
return True
def ensure_valid(self):
"""
Validates the state of this feature view locally.
Raises:
ValueError: The feature view is invalid.
"""
if not self.name:
raise ValueError("Feature view needs a name.")
def with_name(self, name: str):
"""
Returns a renamed copy of this base feature view. This renamed copy should only be
used for query operations and will not modify the underlying base feature view.
Args:
name: The name to assign to the copy.
"""
cp = self.__copy__()
cp.projection.name_alias = name
return cp
def set_projection(self, feature_view_projection: FeatureViewProjection) -> None:
"""
Sets the feature view projection of this base feature view to the given projection.
Args:
feature_view_projection: The feature view projection to be set.
Raises:
ValueError: The name or features of the projection do not match.
"""
if feature_view_projection.name != self.name:
raise ValueError(
f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. "
f"The projection is named {feature_view_projection.name} and the name indicates which "
"FeatureView the projection is for."
)
for feature in feature_view_projection.features:
if feature not in self.features:
raise ValueError(
f"The projection for {self.name} cannot be applied because it contains {feature.name} which the "
"FeatureView doesn't have."
)
self.projection = feature_view_projection
def with_projection(self, feature_view_projection: FeatureViewProjection):
"""
Returns a copy of this base feature view with the feature view projection set to
the given projection.
Args:
feature_view_projection: The feature view projection to assign to the copy.
Raises:
ValueError: The name or features of the projection do not match.
"""
if feature_view_projection.name != self.name:
raise ValueError(
f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. "
f"The projection is named {feature_view_projection.name} and the name indicates which "
"FeatureView the projection is for."
)
for feature in feature_view_projection.features:
if feature not in self.features:
raise ValueError(
f"The projection for {self.name} cannot be applied because it contains {feature.name} which the "
"FeatureView doesn't have."
)
cp = self.__copy__()
cp.projection = feature_view_projection
return cp