-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathfeature_view_utils.py
More file actions
229 lines (197 loc) · 8.82 KB
/
feature_view_utils.py
File metadata and controls
229 lines (197 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Utility functions for feature view operations including source resolution.
"""
import logging
import typing
from dataclasses import dataclass
from typing import Callable, Optional
if typing.TYPE_CHECKING:
from feast.data_source import DataSource
from feast.feature_view import FeatureView
from feast.repo_config import RepoConfig
logger = logging.getLogger(__name__)
@dataclass
class FeatureViewSourceInfo:
"""Information about a feature view's data source resolution."""
data_source: "DataSource"
source_type: str
has_transformation: bool
transformation_func: Optional[Callable] = None
source_description: str = ""
def has_transformation(feature_view: "FeatureView") -> bool:
"""Check if a feature view has transformations (UDF or feature_transformation)."""
return (
getattr(feature_view, "udf", None) is not None
or getattr(feature_view, "feature_transformation", None) is not None
)
def get_transformation_function(feature_view: "FeatureView") -> Optional[Callable]:
"""Extract the transformation function from a feature view."""
feature_transformation = getattr(feature_view, "feature_transformation", None)
if feature_transformation:
# Use feature_transformation if available (preferred)
if hasattr(feature_transformation, "udf") and callable(
feature_transformation.udf
):
return feature_transformation.udf
# Fallback to direct UDF
udf = getattr(feature_view, "udf", None)
if udf and callable(udf):
return udf
return None
def find_original_source_view(feature_view: "FeatureView") -> "FeatureView":
"""
Recursively find the original source feature view that has a batch_source.
For derived feature views, this follows the source_views chain until it finds
a feature view with an actual DataSource (batch_source).
"""
current_view = feature_view
while hasattr(current_view, "source_views") and current_view.source_views:
if not current_view.source_views:
break
current_view = current_view.source_views[0] # Assuming single source for now
return current_view
def check_sink_source_exists(data_source: "DataSource") -> bool:
"""
Check if a sink_source file actually exists.
Args:
data_source: The DataSource to check
Returns:
bool: True if the source exists, False otherwise
"""
try:
import fsspec
# Get the source path
if hasattr(data_source, "path"):
source_path = data_source.path
else:
source_path = str(data_source)
fs, path_in_fs = fsspec.core.url_to_fs(source_path)
return fs.exists(path_in_fs)
except Exception as e:
logger.warning(f"Failed to check if source exists: {e}")
return False
def resolve_feature_view_source(
feature_view: "FeatureView",
config: Optional["RepoConfig"] = None,
is_materialization: bool = False,
) -> FeatureViewSourceInfo:
"""
Resolve the appropriate data source for a feature view.
This handles the complex logic of determining whether to read from:
1. sink_source (materialized data from parent views)
2. batch_source (original data source)
3. Recursive resolution for derived views
Args:
feature_view: The feature view to resolve
config: Repository configuration (optional)
is_materialization: Whether this is during materialization (affects derived view handling)
Returns:
FeatureViewSourceInfo: Information about the resolved source
"""
view_has_transformation = has_transformation(feature_view)
transformation_func = (
get_transformation_function(feature_view) if view_has_transformation else None
)
# Check if this is a derived feature view (has source_views)
is_derived_view = (
hasattr(feature_view, "source_views") and feature_view.source_views
)
if not is_derived_view:
# Regular feature view - use its batch_source directly
return FeatureViewSourceInfo(
data_source=feature_view.batch_source,
source_type="batch_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
source_description=f"Direct batch_source for {feature_view.name}",
)
# This is a derived feature view - need to resolve parent source
if not feature_view.source_views:
raise ValueError(
f"Derived feature view {feature_view.name} has no source_views"
)
parent_view = feature_view.source_views[0] # Assuming single source for now
# For derived views: distinguish between materialization and historical retrieval
if (
hasattr(parent_view, "sink_source")
and parent_view.sink_source
and is_materialization
):
# During materialization, try to use sink_source if it exists
if check_sink_source_exists(parent_view.sink_source):
logger.debug(
f"Materialization: Using parent {parent_view.name} sink_source"
)
return FeatureViewSourceInfo(
data_source=parent_view.sink_source,
source_type="sink_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
source_description=f"Parent {parent_view.name} sink_source for derived view {feature_view.name}",
)
else:
logger.info(
f"Parent {parent_view.name} sink_source doesn't exist during materialization"
)
# Check if parent is also a derived view first - if so, recursively resolve to original source
if hasattr(parent_view, "source_views") and parent_view.source_views:
# Parent is also a derived view - recursively find original source
original_source_view = find_original_source_view(parent_view)
return FeatureViewSourceInfo(
data_source=original_source_view.batch_source,
source_type="original_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
source_description=f"Original source {original_source_view.name} batch_source for derived view {feature_view.name} (via {parent_view.name})",
)
elif hasattr(parent_view, "batch_source") and parent_view.batch_source:
# Parent has a direct batch_source, use it
return FeatureViewSourceInfo(
data_source=parent_view.batch_source,
source_type="batch_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
source_description=f"Parent {parent_view.name} batch_source for derived view {feature_view.name}",
)
else:
# No valid source found
raise ValueError(
f"Unable to resolve data source for derived feature view {feature_view.name} via parent {parent_view.name}"
)
def resolve_feature_view_source_with_fallback(
feature_view: "FeatureView",
config: Optional["RepoConfig"] = None,
is_materialization: bool = False,
) -> FeatureViewSourceInfo:
"""
Resolve feature view source with fallback error handling.
This version includes additional error handling and fallback logic
for cases where the primary resolution fails.
"""
try:
return resolve_feature_view_source(feature_view, config, is_materialization)
except Exception as e:
logger.warning(f"Primary source resolution failed for {feature_view.name}: {e}")
# Fallback: try to find any available source
if hasattr(feature_view, "batch_source") and feature_view.batch_source:
return FeatureViewSourceInfo(
data_source=feature_view.batch_source,
source_type="fallback_batch_source",
has_transformation=has_transformation(feature_view),
transformation_func=get_transformation_function(feature_view),
source_description=f"Fallback batch_source for {feature_view.name}",
)
elif hasattr(feature_view, "source_views") and feature_view.source_views:
# Try the original source view as last resort
original_view = find_original_source_view(feature_view)
return FeatureViewSourceInfo(
data_source=original_view.batch_source,
source_type="fallback_original_source",
has_transformation=has_transformation(feature_view),
transformation_func=get_transformation_function(feature_view),
source_description=f"Fallback original source {original_view.name} for {feature_view.name}",
)
else:
raise ValueError(
f"Unable to resolve any data source for feature view {feature_view.name}"
)