forked from aws/sagemaker-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreprocessor.py
More file actions
117 lines (88 loc) · 4.01 KB
/
preprocessor.py
File metadata and controls
117 lines (88 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import argparse
import os
import warnings
import subprocess
subprocess.call(["pip", "install", "sagemaker-experiments"])
import pandas as pd
import numpy as np
import tarfile
from smexperiments.tracker import Tracker
from sklearn.externals import joblib
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.compose import make_column_transformer
from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action="ignore", category=DataConversionWarning)
columns = [
"turbine_id",
"turbine_type",
"wind_speed",
"rpm_blade",
"oil_temperature",
"oil_level",
"temperature",
"humidity",
"vibrations_frequency",
"pressure",
"wind_direction",
"breakdown",
]
if __name__ == "__main__":
# Read the arguments passed to the script.
parser = argparse.ArgumentParser()
parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
args, _ = parser.parse_known_args()
# Tracking specific parameter value during job.
tracker = Tracker.load()
tracker.log_parameter("train-test-split-ratio", args.train_test_split_ratio)
print("Received arguments {}".format(args))
# Read input data into a Pandas dataframe.
input_data_path = os.path.join("/opt/ml/processing/input", "windturbine_raw_data_header.csv")
print("Reading input data from {}".format(input_data_path))
df = pd.read_csv(input_data_path)
df.columns = columns
# Replacing certain null values.
df["turbine_type"] = df["turbine_type"].fillna("HAWT")
tracker.log_parameter("default-turbine-type", "HAWT")
df["oil_temperature"] = df["oil_temperature"].fillna(37.0)
tracker.log_parameter("default-oil-temperature", 37.0)
# Defining one-hot encoders.
transformer = make_column_transformer(
(["turbine_id", "turbine_type", "wind_direction"], OneHotEncoder(sparse=False)),
remainder="passthrough",
)
X = df.drop("breakdown", axis=1)
y = df["breakdown"]
featurizer_model = transformer.fit(X)
features = featurizer_model.transform(X)
labels = LabelEncoder().fit_transform(y)
# Splitting.
split_ratio = args.train_test_split_ratio
print("Splitting data into train and validation sets with ratio {}".format(split_ratio))
X_train, X_val, y_train, y_val = train_test_split(
features, labels, test_size=split_ratio, random_state=0
)
print("Train features shape after preprocessing: {}".format(X_train.shape))
print("Validation features shape after preprocessing: {}".format(X_val.shape))
# Saving outputs.
train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")
val_features_output_path = os.path.join("/opt/ml/processing/val", "val_features.csv")
val_labels_output_path = os.path.join("/opt/ml/processing/val", "val_labels.csv")
print("Saving training features to {}".format(train_features_output_path))
pd.DataFrame(X_train).to_csv(train_features_output_path, header=False, index=False)
print("Saving validation features to {}".format(val_features_output_path))
pd.DataFrame(X_val).to_csv(val_features_output_path, header=False, index=False)
print("Saving training labels to {}".format(train_labels_output_path))
pd.DataFrame(y_train).to_csv(train_labels_output_path, header=False, index=False)
print("Saving validation labels to {}".format(val_labels_output_path))
pd.DataFrame(y_val).to_csv(val_labels_output_path, header=False, index=False)
# Saving model.
model_path = os.path.join("/opt/ml/processing/model", "model.joblib")
model_output_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
print("Saving featurizer model to {}".format(model_output_path))
joblib.dump(featurizer_model, model_path)
tar = tarfile.open(model_output_path, "w:gz")
tar.add(model_path, arcname="model.joblib")
tar.close()
tracker.close()