Skip to content

ModelLoader

Loader class for interacting with the Minio Object Storage API.

Source code in make_us_rich/serving/model_loader.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class ModelLoader:
    """
    Loader class for interacting with the Minio Object Storage API.
    """

    def __init__(self):
        self.client = MinioClient()
        self.session_models = {}
        self.storage_path = Path.cwd().joinpath("api", "models")
        self.update_date()
        self.update_model_files()


    def get_predictions(self, model_name: str, sample: pd.DataFrame) -> float:
        """
        Gets the predictions from the model.

        Parameters
        ----------
        model_name: str
            Name of the model.
        sample: pd.DataFrame
            Sample to predict.

        Returns
        -------
        float
            Predicted value.
        """
        if self._check_model_exists_in_session(model_name):
            model = self.session_models[model_name]["model"]
            return model.predict(sample)
        else:
            raise ValueError("Model not found in session.")


    def update_date(self):
        """
        Updates the date of the loader.
        """
        self.date = datetime.now().strftime("%Y-%m-%d")


    def update_model_files(self):
        """
        Updates the model files in the serving models directory.
        """
        for model in self._get_list_of_available_models():
            currency, compare = model.split("_")
            self._download_files(currency, compare)
            self._add_model_to_session_models(currency, compare)


    def _get_models_files_path(self, currency: str, compare: str):
        """
        Returns the path to the files in models directory.

        Parameters
        ----------
        currency: str
            Currency used in the model.
        compare: str
            Compare used in the model.

        Returns
        -------
        str
            Path to the model files.
        """
        model = self.storage_path.joinpath(f"{currency}_{compare}", "model.onnx")
        scaler = self.storage_path.joinpath(f"{currency}_{compare}", "scaler.pkl")
        return model, scaler


    def _makedir(self, currency: str, compare: str) -> None:
        """
        Creates a directory for the model files if it doesn't exist.

        Parameters
        ----------
        currency: str
            Currency used in the model.
        compare: str
            Compare used in the model.
        """
        self.storage_path.joinpath(f"{currency}_{compare}").mkdir(exist_ok=True)


    def _download_files(self, currency: str, compare: str) -> None:
        """
        Downloads model and features engineering files from Minio.

        Parameters
        ----------
        currency: str
            Currency used in the model.
        compare: str
            Compare used in the model.
        """
        self._makedir(currency, compare)
        self.client.download(
            self.client.bucket,
            f"{self.date}/{currency}_{compare}/model.onnx",
            f"{self.storage_path}/{currency}_{compare}/model.onnx"
        )
        self.client.download(
            self.client.bucket,
            f"{self.date}/{currency}_{compare}/scaler.pkl",
            f"{self.storage_path}/{currency}_{compare}/scaler.pkl"
        )


    def _get_list_of_available_models(self) -> List[str]:
        """
        Looks for available models in the Minio bucket based on the date.

        Returns
        -------
        List[str]
            List of available models.
        """
        available_models = self.client.list_objects(self.client.bucket, prefix=self.date, recursive=True)
        return list(set([model.object_name.split("/")[1] for model in available_models]))


    def _add_model_to_session_models(self, currency: str, compare: str) -> str:
        """
        Adds a new model to the model session.

        Parameters
        ----------
        currency: str
            Currency used in the model.
        compare: str
            Compare used in the model.

        Returns
        -------
        str
        """
        model_path, scaler_path = self._get_models_files_path(currency, compare)
        model = OnnxModel(model_path=model_path, scaler_path=scaler_path)
        self.session_models[f"{currency}_{compare}"] = {"model": model}
        return f"Model {model} added to session."


    def _check_model_exists_in_session(self, model_name: str) -> bool:
        """
        Checks if the model exists in the current session.

        Parameters
        ----------
        model_name: str
            Name of the model.

        Returns
        -------
        bool
        """
        if model_name in self.session_models.keys():
            return True
        return False

_add_model_to_session_models(currency, compare)

Adds a new model to the model session.

Parameters:

Name Type Description Default
currency str

Currency used in the model.

required
compare str

Compare used in the model.

required

Returns:

Type Description
str
Source code in make_us_rich/serving/model_loader.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def _add_model_to_session_models(self, currency: str, compare: str) -> str:
    """
    Adds a new model to the model session.

    Parameters
    ----------
    currency: str
        Currency used in the model.
    compare: str
        Compare used in the model.

    Returns
    -------
    str
    """
    model_path, scaler_path = self._get_models_files_path(currency, compare)
    model = OnnxModel(model_path=model_path, scaler_path=scaler_path)
    self.session_models[f"{currency}_{compare}"] = {"model": model}
    return f"Model {model} added to session."

_check_model_exists_in_session(model_name)

Checks if the model exists in the current session.

Parameters:

Name Type Description Default
model_name str

Name of the model.

required

Returns:

Type Description
bool
Source code in make_us_rich/serving/model_loader.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def _check_model_exists_in_session(self, model_name: str) -> bool:
    """
    Checks if the model exists in the current session.

    Parameters
    ----------
    model_name: str
        Name of the model.

    Returns
    -------
    bool
    """
    if model_name in self.session_models.keys():
        return True
    return False

_download_files(currency, compare)

Downloads model and features engineering files from Minio.

Parameters:

Name Type Description Default
currency str

Currency used in the model.

required
compare str

Compare used in the model.

required
Source code in make_us_rich/serving/model_loader.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def _download_files(self, currency: str, compare: str) -> None:
    """
    Downloads model and features engineering files from Minio.

    Parameters
    ----------
    currency: str
        Currency used in the model.
    compare: str
        Compare used in the model.
    """
    self._makedir(currency, compare)
    self.client.download(
        self.client.bucket,
        f"{self.date}/{currency}_{compare}/model.onnx",
        f"{self.storage_path}/{currency}_{compare}/model.onnx"
    )
    self.client.download(
        self.client.bucket,
        f"{self.date}/{currency}_{compare}/scaler.pkl",
        f"{self.storage_path}/{currency}_{compare}/scaler.pkl"
    )

_get_list_of_available_models()

Looks for available models in the Minio bucket based on the date.

Returns:

Type Description
List[str]

List of available models.

Source code in make_us_rich/serving/model_loader.py
123
124
125
126
127
128
129
130
131
132
133
def _get_list_of_available_models(self) -> List[str]:
    """
    Looks for available models in the Minio bucket based on the date.

    Returns
    -------
    List[str]
        List of available models.
    """
    available_models = self.client.list_objects(self.client.bucket, prefix=self.date, recursive=True)
    return list(set([model.object_name.split("/")[1] for model in available_models]))

_get_models_files_path(currency, compare)

Returns the path to the files in models directory.

Parameters:

Name Type Description Default
currency str

Currency used in the model.

required
compare str

Compare used in the model.

required

Returns:

Type Description
str

Path to the model files.

Source code in make_us_rich/serving/model_loader.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def _get_models_files_path(self, currency: str, compare: str):
    """
    Returns the path to the files in models directory.

    Parameters
    ----------
    currency: str
        Currency used in the model.
    compare: str
        Compare used in the model.

    Returns
    -------
    str
        Path to the model files.
    """
    model = self.storage_path.joinpath(f"{currency}_{compare}", "model.onnx")
    scaler = self.storage_path.joinpath(f"{currency}_{compare}", "scaler.pkl")
    return model, scaler

_makedir(currency, compare)

Creates a directory for the model files if it doesn't exist.

Parameters:

Name Type Description Default
currency str

Currency used in the model.

required
compare str

Compare used in the model.

required
Source code in make_us_rich/serving/model_loader.py
85
86
87
88
89
90
91
92
93
94
95
96
def _makedir(self, currency: str, compare: str) -> None:
    """
    Creates a directory for the model files if it doesn't exist.

    Parameters
    ----------
    currency: str
        Currency used in the model.
    compare: str
        Compare used in the model.
    """
    self.storage_path.joinpath(f"{currency}_{compare}").mkdir(exist_ok=True)

get_predictions(model_name, sample)

Gets the predictions from the model.

Parameters:

Name Type Description Default
model_name str

Name of the model.

required
sample pd.DataFrame

Sample to predict.

required

Returns:

Type Description
float

Predicted value.

Source code in make_us_rich/serving/model_loader.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_predictions(self, model_name: str, sample: pd.DataFrame) -> float:
    """
    Gets the predictions from the model.

    Parameters
    ----------
    model_name: str
        Name of the model.
    sample: pd.DataFrame
        Sample to predict.

    Returns
    -------
    float
        Predicted value.
    """
    if self._check_model_exists_in_session(model_name):
        model = self.session_models[model_name]["model"]
        return model.predict(sample)
    else:
        raise ValueError("Model not found in session.")

update_date()

Updates the date of the loader.

Source code in make_us_rich/serving/model_loader.py
47
48
49
50
51
def update_date(self):
    """
    Updates the date of the loader.
    """
    self.date = datetime.now().strftime("%Y-%m-%d")

update_model_files()

Updates the model files in the serving models directory.

Source code in make_us_rich/serving/model_loader.py
54
55
56
57
58
59
60
61
def update_model_files(self):
    """
    Updates the model files in the serving models directory.
    """
    for model in self._get_list_of_available_models():
        currency, compare = model.split("_")
        self._download_files(currency, compare)
        self._add_model_to_session_models(currency, compare)

OnnxModel

Source code in make_us_rich/serving/model.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class OnnxModel:

    def __init__(self, model_path: PosixPath, scaler_path: PosixPath):
        self.model_path = model_path
        self.scaler_path = scaler_path
        self.model_name = self.model_path.parent.parts[-1]
        self.model = onnxruntime.InferenceSession(str(model_path))
        self.scaler = self._load_scaler()
        self.descaler = self._create_descaler()


    def __repr__(self) -> str:
        return f"<OnnxModel: {self.model_name}>"


    def predict(self, sample: pd.DataFrame) -> float:
        """
        Predicts the close price based on the input sample.

        Parameters
        ----------
        sample: pd.DataFrame
            Input sample.

        Returns
        -------
        float
            Predicted close price.
        """
        X = self._preprocessing_sample(sample)
        inputs = {self.model.get_inputs()[0].name: to_numpy(X)}
        results = self.model.run(None, inputs)[0][0]
        return self._descaling_sample(results)


    def _create_descaler(self) -> MinMaxScaler:
        """
        Creates a descaler.

        Returns
        -------
        MinMaxScaler
        """
        descaler = MinMaxScaler()
        descaler.min_, descaler.scale_ = self.scaler.min_[-1], self.scaler.scale_[-1]
        return descaler


    def _descaling_sample(self, sample) -> None:
        """
        Descalings the sample.

        Parameters
        ----------
        sample: numpy.ndarray
            Sample to be descaled.

        Returns
        -------
        float
            Descaled sample.
        """
        values_2d = np.array(sample)[:, np.newaxis]
        return self.descaler.inverse_transform(values_2d).flatten()


    def _load_scaler(self) -> MinMaxScaler:
        """
        Loads the scaler from the model files.

        Returns
        -------
        MinMaxScaler
        """
        with open(self.scaler_path, "rb") as file:
            return load(file)


    def _preprocessing_sample(self, sample: pd.DataFrame) -> torch.tensor:
        """
        Preprocesses the input sample.

        Parameters
        ----------
        sample: pd.DataFrame
            Input sample.

        Returns
        -------
        torch.tensor
            Preprocessed sample.
        """
        data = extract_features_from_dataset(sample)
        scaled_data = pd.DataFrame(
            self.scaler.transform(data), index=data.index, columns=data.columns
        )
        return torch.Tensor(scaled_data.values).unsqueeze(0)

_create_descaler()

Creates a descaler.

Returns:

Type Description
MinMaxScaler
Source code in make_us_rich/serving/model.py
49
50
51
52
53
54
55
56
57
58
59
def _create_descaler(self) -> MinMaxScaler:
    """
    Creates a descaler.

    Returns
    -------
    MinMaxScaler
    """
    descaler = MinMaxScaler()
    descaler.min_, descaler.scale_ = self.scaler.min_[-1], self.scaler.scale_[-1]
    return descaler

_descaling_sample(sample)

Descalings the sample.

Parameters:

Name Type Description Default
sample

Sample to be descaled.

required

Returns:

Type Description
float

Descaled sample.

Source code in make_us_rich/serving/model.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def _descaling_sample(self, sample) -> None:
    """
    Descalings the sample.

    Parameters
    ----------
    sample: numpy.ndarray
        Sample to be descaled.

    Returns
    -------
    float
        Descaled sample.
    """
    values_2d = np.array(sample)[:, np.newaxis]
    return self.descaler.inverse_transform(values_2d).flatten()

_load_scaler()

Loads the scaler from the model files.

Returns:

Type Description
MinMaxScaler
Source code in make_us_rich/serving/model.py
80
81
82
83
84
85
86
87
88
89
def _load_scaler(self) -> MinMaxScaler:
    """
    Loads the scaler from the model files.

    Returns
    -------
    MinMaxScaler
    """
    with open(self.scaler_path, "rb") as file:
        return load(file)

_preprocessing_sample(sample)

Preprocesses the input sample.

Parameters:

Name Type Description Default
sample pd.DataFrame

Input sample.

required

Returns:

Type Description
torch.tensor

Preprocessed sample.

Source code in make_us_rich/serving/model.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def _preprocessing_sample(self, sample: pd.DataFrame) -> torch.tensor:
    """
    Preprocesses the input sample.

    Parameters
    ----------
    sample: pd.DataFrame
        Input sample.

    Returns
    -------
    torch.tensor
        Preprocessed sample.
    """
    data = extract_features_from_dataset(sample)
    scaled_data = pd.DataFrame(
        self.scaler.transform(data), index=data.index, columns=data.columns
    )
    return torch.Tensor(scaled_data.values).unsqueeze(0)

predict(sample)

Predicts the close price based on the input sample.

Parameters:

Name Type Description Default
sample pd.DataFrame

Input sample.

required

Returns:

Type Description
float

Predicted close price.

Source code in make_us_rich/serving/model.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def predict(self, sample: pd.DataFrame) -> float:
    """
    Predicts the close price based on the input sample.

    Parameters
    ----------
    sample: pd.DataFrame
        Input sample.

    Returns
    -------
    float
        Predicted close price.
    """
    X = self._preprocessing_sample(sample)
    inputs = {self.model.get_inputs()[0].name: to_numpy(X)}
    results = self.model.run(None, inputs)[0][0]
    return self._descaling_sample(results)

Last update: 2022-05-04