-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathonline_response.py
More file actions
142 lines (116 loc) · 5.11 KB
/
online_response.py
File metadata and controls
142 lines (116 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Any, Dict, List, TypeAlias, Union
import pandas as pd
import pyarrow as pa
from feast.feature_view import DUMMY_ENTITY_ID
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
from feast.torch_wrapper import get_torch
from feast.type_map import feast_value_type_to_python_type
if TYPE_CHECKING:
import torch
TorchTensor: TypeAlias = torch.Tensor
else:
TorchTensor: TypeAlias = Any
TIMESTAMP_POSTFIX: str = "__ts"
class OnlineResponse:
"""
Defines an online response in feast.
"""
def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
"""
Construct a native online response from its protobuf version.
Args:
online_response_proto: GetOnlineResponse proto object to construct from.
"""
self.proto = online_response_proto
# Delete DUMMY_ENTITY_ID from proto if it exists
for idx, val in enumerate(self.proto.metadata.feature_names.val):
if val == DUMMY_ENTITY_ID:
del self.proto.metadata.feature_names.val[idx]
del self.proto.results[idx]
break
def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
"""
Converts GetOnlineFeaturesResponse features into a dictionary form.
Args:
include_event_timestamps: bool Optionally include feature timestamps in the dictionary
"""
response: Dict[str, List[Any]] = {}
for feature_ref, feature_vector in zip(
self.proto.metadata.feature_names.val, self.proto.results
):
response[feature_ref] = [
feast_value_type_to_python_type(v) for v in feature_vector.values
]
if include_event_timestamps:
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
response[timestamp_ref] = [
ts.seconds for ts in feature_vector.event_timestamps
]
return response
def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
"""
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
Args:
include_event_timestamps: bool Optionally include feature timestamps in the dataframe
"""
return pd.DataFrame(self.to_dict(include_event_timestamps))
def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table:
"""
Converts GetOnlineFeaturesResponse features into pyarrow Table.
Args:
include_event_timestamps: bool Optionally include feature timestamps in the table
"""
return pa.Table.from_pydict(self.to_dict(include_event_timestamps))
def to_tensor(
self,
kind: str = "torch",
default_value: Any = float("nan"),
) -> Dict[str, Union[TorchTensor, List[Any]]]:
"""
Converts GetOnlineFeaturesResponse features into a dictionary of tensors or lists.
- Numeric features (int, float, bool) -> torch.Tensor
- Non-numeric features (e.g., strings) -> list[Any]
Args:
kind: Backend tensor type. Currently only "torch" is supported.
default_value: Value to substitute for missing (None) entries.
Returns:
Dict[str, Union[torch.Tensor, List[Any]]]: Mapping of feature names to tensors or lists.
"""
if kind != "torch":
raise ValueError(
f"Unsupported tensor kind: {kind}. Only 'torch' is supported currently."
)
torch = get_torch()
feature_dict = self.to_dict(include_event_timestamps=False)
feature_keys = set(self.proto.metadata.feature_names.val)
tensor_dict: Dict[str, Union[TorchTensor, List[Any]]] = {}
for key in feature_keys:
raw_values = feature_dict[key]
values = [v if v is not None else default_value for v in raw_values]
first_valid = next((v for v in values if v is not None), None)
if isinstance(first_valid, (int, float, bool)):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
tensor_dict[key] = torch.tensor(values, device=device)
except Exception as e:
raise ValueError(
f"Failed to convert values for '{key}' to tensor: {e}"
)
else:
tensor_dict[key] = (
values # Return as-is for strings or unsupported types
)
return tensor_dict