-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathbase.py
More file actions
147 lines (127 loc) · 4.95 KB
/
base.py
File metadata and controls
147 lines (127 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import functools
from abc import ABC
from typing import Any, Callable, Dict, Optional, Union
import dill
from feast.protos.feast.core.Transformation_pb2 import (
SubstraitTransformationV2 as SubstraitTransformationProto,
)
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.transformation.factory import (
TRANSFORMATION_CLASS_FOR_TYPE,
get_transformation_class_from_type,
)
from feast.transformation.mode import TransformationMode
class Transformation(ABC):
"""
Base Transformation class. Can be used to define transformations that can be applied to FeatureViews.
Also encapsulates the logic to serialize and deserialize the transformation to and from proto. This is
important for the future transformation lifecycle management.
E.g.:
pandas_transformation = Transformation(
mode=TransformationMode.PANDAS,
udf=lambda df: df.assign(new_column=df['column1'] + df['column2']),
)
"""
udf: Callable[[Any], Any]
udf_string: str
def __new__(
cls,
mode: Union[TransformationMode, str],
udf: Callable[[Any], Any],
udf_string: str,
name: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
description: str = "",
owner: str = "",
*args,
**kwargs,
) -> "Transformation":
"""
Creates a Transformation object.
Args:
mode: (required) The mode of the transformation. Choose one from TransformationMode.
udf: (required) The user-defined transformation function.
udf_string: (required) The string representation of the udf. The dill get source doesn't
work for all cases when extracting the source code from the udf. So it's better to pass
the source code as a string.
name: (optional) The name of the transformation.
tags: (optional) Metadata tags for the transformation.
description: (optional) A description of the transformation.
owner: (optional) The owner of the transformation.
"""
if cls is Transformation:
if isinstance(mode, TransformationMode):
mode = mode.value
if mode.lower() in TRANSFORMATION_CLASS_FOR_TYPE:
subclass = get_transformation_class_from_type(mode.lower())
return super().__new__(subclass)
raise ValueError(
f"Invalid mode: {mode}. Choose one from TransformationMode."
)
return super().__new__(cls)
def __init__(
self,
mode: Union[TransformationMode, str],
udf: Callable[[Any], Any],
udf_string: str,
name: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
description: str = "",
owner: str = "",
):
self.mode = mode
self.udf = udf
self.udf_string = udf_string
self.name = name or udf.__name__
self.tags = tags or {}
self.description = description
self.owner = owner
def to_proto(self) -> Union[UserDefinedFunctionProto, SubstraitTransformationProto]:
mode_str = (
self.mode.value if isinstance(self.mode, TransformationMode) else self.mode
)
return UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
mode=mode_str,
)
def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "Transformation":
return Transformation(mode=self.mode, udf=self.udf, udf_string=self.udf_string)
def transform(self, *inputs: Any) -> Any:
raise NotImplementedError
def transform_arrow(self, *args, **kwargs) -> Any:
pass
def transform_singleton(self, *args, **kwargs) -> Any:
pass
def infer_features(self, *args, **kwargs) -> Any:
raise NotImplementedError
def transformation(
mode: Union[TransformationMode, str],
name: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
description: Optional[str] = "",
owner: Optional[str] = "",
):
def mainify(obj):
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the sfv.
if obj.__module__ != "__main__":
obj.__module__ = "__main__"
def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
transformation_obj = Transformation(
mode=mode,
name=name or user_function.__name__,
tags=tags,
description=description,
owner=owner,
udf=user_function,
udf_string=udf_string,
)
functools.update_wrapper(wrapper=transformation_obj, wrapped=user_function)
return transformation_obj
return decorator