ML Serving Blueprints¶
What are ML Serving Blueprints?¶
Serving Blueprints are templates for a serving a trained model. Each blueprint is designed to serve a specific model type and framework. Each blueprint corresponds to a microservice that trains a model.
Pytorch Regression Serving Blueprint (MLflow)¶
Pytorch Regression MLflow Serving 1import logging
2import mlflow
3import mlflow.pytorch
4from asyncio import Future
5
6from fastiot.core import FastIoTService, reply
7from fastiot.msg.thing import Thing
8
9import torch
10import numpy as np
11import pandas as pd
12
13from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import \
14 request_get_processed_data_points_from_raw_data, ok_response_thing, error_response_thing
15from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import ML_SERVING_SUBJECT
16from src.blueprint_dev_v2.logger.logger import log
17
18
19class MlServingMlflowService(FastIoTService):
20 """
21 This service is responsible for serving predictions using a PyTorch model.
22
23 The model is a simple neural network that takes 15 input features and outputs a single value.
24
25 Attributes
26 ----------
27 _regression_model : DemonstratorNeuralNet
28 The regression model to be used for predictions.
29 _example_raw_payload : dict
30 An example raw payload to be used for testing the model.
31
32 Methods
33 -------
34 _start()
35 Start the service.
36 _setup_model()
37 Initialize the regression model.
38 _load_model_weights_mlfow()
39 Load the model weights from mlflow.
40 _process_raw_data_points(data: list[dict]) -> Future[list[dict]]
41 Process raw data points.
42 _get_prediction(raw_datapoints: list[dict]) -> Future[list[list[float]]]
43 Get predictions for raw data points.
44 prediction(topic: str, msg: Thing) -> Thing
45 Serve predictions for raw data points.
46 """
47
48 MODEL_URI = "models:/MyModel/1"
49 MLFLOW_TRACKING_URI = "http://127.0.0.1:8080"
50
51 _regression_model = None
52
53 _example_raw_payload = {
54 'laborant': "TK",
55 'material_id': "11111111",
56 'datum': "15.03.2024, 16:14:48",
57 'rohwert_1_labormessung': 22.64237723051251,
58 'rohwert_2_labormessung': 0.55194,
59 'rohwert_3_labormessung': -0.472279,
60 'aufbereiteter_wert': 0.287696
61 }
62
63 async def _start(self):
64 """
65 Runs when the service starts.
66 """
67 log.info("MlPytorchRegressionService started")
68 mlflow.set_tracking_uri(self.MLFLOW_TRACKING_URI)
69 await self._setup_model()
70
71 async def _setup_model(self):
72 """
73 Initialize the regression model.
74
75 Returns
76 -------
77 None
78 """
79 log.info("Setting up Demonstrator Regression model")
80
81 # Load model weights
82 await self._load_model_weights_mlfow()
83
84 # test model with example payload
85 _ = await self._get_prediction(
86 raw_datapoints=[
87 self._example_raw_payload,
88 self._example_raw_payload,
89 self._example_raw_payload,
90 ]
91 )
92
93 async def _load_model_weights_mlfow(self):
94 """
95 Load the model weights from mlflow.
96
97 Returns
98 -------
99 None
100 """
101 log.info("Loading model weights from mlfow")
102 model = mlflow.pytorch.load_model(model_uri=self.MODEL_URI)
103 log.info(f"Model loaded from mlflow: \n{model}")
104 self._regression_model = model
105
106 async def _process_raw_data_points(self, data: list[dict]) -> Future[list[dict]]:
107 """
108 Process raw data points.
109
110 Parameters
111 ----------
112 data
113 Raw data points to be processed.
114
115 Returns
116 -------
117 Future[list[dict]]
118 Processed data points.
119 """
120 log.info(f"Processing raw data points")
121 return await request_get_processed_data_points_from_raw_data(
122 fiot_service=self,
123 data=data,
124 )
125
126 async def _get_prediction(self, raw_datapoints: list[dict]) -> Future[list[list[float]]]:
127 """
128 Get predictions for raw data points.
129
130 Parameters
131 ----------
132 raw_datapoints
133 Raw data points to get predictions for.
134
135 Returns
136 -------
137 Future[list[list[float]]]
138 Predictions for raw data points.
139 """
140 if self._regression_model is None:
141 raise ValueError("Regression model not initialized. Please call _setup_model() first.")
142
143 log.info(f"Integration test: getting a processed data point from Data Processing Service and performing a prediction.")
144 processed_data = await self._process_raw_data_points(data=raw_datapoints)
145 temp = pd.DataFrame(processed_data)
146 _ = np.array([temp.pop("aufbereiteter_wert")])
147 x_data = temp.to_numpy()
148 prediction = self._regression_model(torch.tensor(x_data, dtype=torch.float32))
149 log.info(f"Integration test passed.")
150 return prediction.tolist()
151
152 @reply(ML_SERVING_SUBJECT)
153 async def prediction(self, _: str, msg: Thing) -> Thing:
154 """
155 Serve predictions for raw data points.
156
157 Parameters
158 ----------
159 _
160 msg
161 The message containing the raw data points.
162
163 Returns
164 -------
165 Thing
166 The response message containing the predictions.
167 """
168 if not isinstance(msg.value, list):
169 log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
170 f"but received: {type(msg.value)}")
171 raise ValueError("Payload must be a list of raw data points")
172
173 raw_data_points: list[dict] = msg.value
174
175 try:
176 res: list[list[float]] = await self._get_prediction(raw_datapoints=raw_data_points)
177
178 return ok_response_thing(payload=res, fiot_service=self)
179
180 except Exception as e:
181 log.error(f"Error while processing raw data points: {e}")
182 return error_response_thing(exception=e, fiot_service=self)
183
184
185if __name__ == '__main__':
186 logging.basicConfig(level=logging.DEBUG)
187 MlServingMlflowService.main()
Pytorch Regression Serving Blueprint (WandB)¶
Note
WandB support might be removed in the future in favor of MLflow.
1import logging
2from asyncio import Future
3
4from fastiot.core import FastIoTService, reply
5from fastiot.msg.thing import Thing
6
7import torch
8import numpy as np
9import pandas as pd
10
11from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import \
12 request_get_processed_data_points_from_raw_data, ok_response_thing, error_response_thing
13from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import ML_SERVING_SUBJECT
14from blueprint_dev_v2_services.ml_pytorch_regression.ml_pytorch_regression_service import DemonstratorNeuralNet
15from src.blueprint_dev_v2.logger.logger import log
16
17
18class MlServingService(FastIoTService):
19 """
20 This service is responsible for serving predictions using a PyTorch model.
21
22 The model is a simple neural network that takes 15 input features and outputs a single value.
23
24 Attributes
25 ----------
26 _regression_model : DemonstratorNeuralNet
27 The regression model to be used for predictions.
28 _example_raw_payload : dict
29 An example raw payload to be used for testing the model.
30
31 Methods
32 -------
33 _start()
34 Start the service.
35 _setup_model()
36 Initialize the regression model.
37 _load_model_weights_wandb()
38 Load the model weights from wandb.
39 _process_raw_data_points(data: list[dict]) -> Future[list[dict]]
40 Process raw data points.
41 _get_prediction(raw_datapoints: list[dict]) -> Future[list[list[float]]]
42 Get predictions for raw data points.
43 prediction(topic: str, msg: Thing) -> Thing
44 Serve predictions for raw data points.
45 """
46 _regression_model = None
47
48 _example_raw_payload = {
49 'laborant': "TK",
50 'material_id': "11111111",
51 'datum': "15.03.2024, 16:14:48",
52 'rohwert_1_labormessung': 22.64237723051251,
53 'rohwert_2_labormessung': 0.55194,
54 'rohwert_3_labormessung': -0.472279,
55 'aufbereiteter_wert': 0.287696
56 }
57
58 async def _start(self):
59 """
60 Runs when the service starts.
61 """
62 log.info("MlPytorchRegressionService started")
63 await self._setup_model()
64
65 async def _setup_model(self):
66 """
67 Initialize the regression model.
68
69 Returns
70 -------
71 None
72 """
73 log.info("Setting up Demonstrator Regression model")
74
75 self._regression_model = DemonstratorNeuralNet(
76 input_dim=15,
77 hidden_dim=10,
78 output_dim=1
79 )
80
81 # Load model weights
82 await self._load_model_weights_wandb()
83
84 # test model with example payload
85 _ = await self._get_prediction(
86 raw_datapoints=[
87 self._example_raw_payload,
88 self._example_raw_payload,
89 self._example_raw_payload,
90 ]
91 )
92
93 async def _load_model_weights_wandb(self):
94 """
95 Load the model weights from wandb.
96
97 Returns
98 -------
99 None
100 """
101 log.info("Loading model weights from wandb")
102
103 import wandb
104
105 # Create a new API object
106 api = wandb.Api()
107
108 # Get all the artifacts of a specific project
109 wandb_config = {
110 "entity": "querry", # Replace with your username
111 "project": "KIOptipack-dev", # Replace with your project name
112 "name": "model_4c7eb0ae-2dc6-49f5-a179-605a89", # Replace with your artifact name
113 "version": "v6",
114 "group": "MVDP-pytorch-regression",
115 "model_type": "pytorch-regression-model",
116 }
117 artifact = api.artifact("querry/KIOptipack-dev/DemonstratorNeuralNet:latest")
118 log.info(f"Downloading model weights for model '{artifact.metadata['model_name']}'")
119 artifact.download()
120
121 model_name = artifact.metadata["model_name"]
122 model_version = artifact.version
123 path = f"./artifacts/DemonstratorNeuralNet:{model_version}/{model_name}"
124 if self._regression_model is None:
125 raise ValueError("Regression model not initialized. Please call _setup_model() first.")
126 self._regression_model.load_state_dict(
127 torch.load(f"./artifacts/DemonstratorNeuralNet:{model_version}/{model_name}"))
128
129 async def _process_raw_data_points(self, data: list[dict]) -> Future[list[dict]]:
130 """
131 Process raw data points.
132
133 Parameters
134 ----------
135 data
136 Raw data points.
137
138 Returns
139 -------
140 Future[list[dict]]
141 Processed data points.
142 """
143 log.info(f"Processing raw data points, received")
144 return await request_get_processed_data_points_from_raw_data(
145 fiot_service=self,
146 data=data,
147 )
148
149 async def _get_prediction(self, raw_datapoints: list[dict]) -> Future[list[list[float]]]:
150 """
151 Get predictions for raw data points.
152
153 Parameters
154 ----------
155 raw_datapoints
156 Raw data points.
157
158 Returns
159 -------
160 Future[list[list[float]]]
161 Predictions for raw data points.
162 """
163 if self._regression_model is None:
164 raise ValueError("Regression model not initialized. Please call _setup_model() first.")
165
166 processed_data = await self._process_raw_data_points(data=raw_datapoints)
167 temp = pd.DataFrame(processed_data)
168 _ = np.array([temp.pop("aufbereiteter_wert")])
169 x_data = temp.to_numpy()
170
171 prediction = self._regression_model(torch.tensor(x_data, dtype=torch.float32))
172 return prediction.tolist()
173
174 @reply(ML_SERVING_SUBJECT)
175 async def prediction(self, _: str, msg: Thing) -> Thing:
176 """
177 Serve predictions for raw data points.
178
179 Parameters
180 ----------
181 _
182 msg
183 The message containing the raw data points.
184 msg
185 The message containing the raw data points.
186
187 Returns
188 -------
189 Thing
190 The response message.
191 """
192 if not isinstance(msg.value, list):
193 log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
194 f"but received: {type(msg.value)}")
195 raise ValueError("Payload must be a list of raw data points")
196
197 raw_data_points: list[dict] = msg.value
198
199 try:
200 res:list[list[float]] = await self._get_prediction(raw_datapoints=raw_data_points)
201
202 return ok_response_thing(payload=res, fiot_service=self)
203
204 except Exception as e:
205 log.error(f"Error while processing raw data points: {e}")
206 return error_response_thing(exception=e, fiot_service=self)
207
208
209if __name__ == '__main__':
210 logging.basicConfig(level=logging.DEBUG)
211 MlServingService.main()