ML Serving Blueprints

What are ML Serving Blueprints?

Serving Blueprints are templates for a serving a trained model. Each blueprint is designed to serve a specific model type and framework. Each blueprint corresponds to a microservice that trains a model.

Pytorch Regression Serving Blueprint (MLflow)

Pytorch Regression MLflow Serving
  1import logging
  2import mlflow
  3import mlflow.pytorch
  4from asyncio import Future
  5
  6from fastiot.core import FastIoTService, reply
  7from fastiot.msg.thing import Thing
  8
  9import torch
 10import numpy as np
 11import pandas as pd
 12
 13from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import \
 14    request_get_processed_data_points_from_raw_data, ok_response_thing, error_response_thing
 15from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import ML_SERVING_SUBJECT
 16from src.blueprint_dev_v2.logger.logger import log
 17
 18
 19class MlServingMlflowService(FastIoTService):
 20    """
 21    This service is responsible for serving predictions using a PyTorch model.
 22
 23    The model is a simple neural network that takes 15 input features and outputs a single value.
 24
 25    Attributes
 26    ----------
 27    _regression_model : DemonstratorNeuralNet
 28        The regression model to be used for predictions.
 29    _example_raw_payload : dict
 30        An example raw payload to be used for testing the model.
 31
 32    Methods
 33    -------
 34    _start()
 35        Start the service.
 36    _setup_model()
 37        Initialize the regression model.
 38    _load_model_weights_mlfow()
 39        Load the model weights from mlflow.
 40    _process_raw_data_points(data: list[dict]) -> Future[list[dict]]
 41        Process raw data points.
 42    _get_prediction(raw_datapoints: list[dict]) -> Future[list[list[float]]]
 43        Get predictions for raw data points.
 44    prediction(topic: str, msg: Thing) -> Thing
 45        Serve predictions for raw data points.
 46    """
 47
 48    MODEL_URI = "models:/MyModel/1"
 49    MLFLOW_TRACKING_URI = "http://127.0.0.1:8080"
 50
 51    _regression_model = None
 52
 53    _example_raw_payload = {
 54        'laborant': "TK",
 55        'material_id': "11111111",
 56        'datum': "15.03.2024, 16:14:48",
 57        'rohwert_1_labormessung': 22.64237723051251,
 58        'rohwert_2_labormessung': 0.55194,
 59        'rohwert_3_labormessung': -0.472279,
 60        'aufbereiteter_wert': 0.287696
 61    }
 62
 63    async def _start(self):
 64        """
 65        Runs when the service starts.
 66        """
 67        log.info("MlPytorchRegressionService started")
 68        mlflow.set_tracking_uri(self.MLFLOW_TRACKING_URI)
 69        await self._setup_model()
 70
 71    async def _setup_model(self):
 72        """
 73        Initialize the regression model.
 74
 75        Returns
 76        -------
 77        None
 78        """
 79        log.info("Setting up Demonstrator Regression model")
 80
 81        # Load model weights
 82        await self._load_model_weights_mlfow()
 83
 84        # test model with example payload
 85        _ = await self._get_prediction(
 86            raw_datapoints=[
 87                self._example_raw_payload,
 88                self._example_raw_payload,
 89                self._example_raw_payload,
 90            ]
 91        )
 92
 93    async def _load_model_weights_mlfow(self):
 94        """
 95        Load the model weights from mlflow.
 96
 97        Returns
 98        -------
 99        None
100        """
101        log.info("Loading model weights from mlfow")
102        model = mlflow.pytorch.load_model(model_uri=self.MODEL_URI)
103        log.info(f"Model loaded from mlflow: \n{model}")
104        self._regression_model = model
105
106    async def _process_raw_data_points(self, data: list[dict]) -> Future[list[dict]]:
107        """
108        Process raw data points.
109
110        Parameters
111        ----------
112        data
113            Raw data points to be processed.
114
115        Returns
116        -------
117        Future[list[dict]]
118            Processed data points.
119        """
120        log.info(f"Processing raw data points")
121        return await request_get_processed_data_points_from_raw_data(
122            fiot_service=self,
123            data=data,
124        )
125
126    async def _get_prediction(self, raw_datapoints: list[dict]) -> Future[list[list[float]]]:
127        """
128        Get predictions for raw data points.
129
130        Parameters
131        ----------
132        raw_datapoints
133            Raw data points to get predictions for.
134
135        Returns
136        -------
137        Future[list[list[float]]]
138            Predictions for raw data points.
139        """
140        if self._regression_model is None:
141            raise ValueError("Regression model not initialized. Please call _setup_model() first.")
142
143        log.info(f"Integration test: getting a processed data point from Data Processing Service and performing a prediction.")
144        processed_data = await self._process_raw_data_points(data=raw_datapoints)
145        temp = pd.DataFrame(processed_data)
146        _ = np.array([temp.pop("aufbereiteter_wert")])
147        x_data = temp.to_numpy()
148        prediction = self._regression_model(torch.tensor(x_data, dtype=torch.float32))
149        log.info(f"Integration test passed.")
150        return prediction.tolist()
151
152    @reply(ML_SERVING_SUBJECT)
153    async def prediction(self, _: str, msg: Thing) -> Thing:
154        """
155        Serve predictions for raw data points.
156
157        Parameters
158        ----------
159        _
160        msg
161            The message containing the raw data points.
162
163        Returns
164        -------
165        Thing
166            The response message containing the predictions.
167        """
168        if not isinstance(msg.value, list):
169            log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
170                      f"but received: {type(msg.value)}")
171            raise ValueError("Payload must be a list of raw data points")
172
173        raw_data_points: list[dict] = msg.value
174
175        try:
176            res: list[list[float]] = await self._get_prediction(raw_datapoints=raw_data_points)
177
178            return ok_response_thing(payload=res, fiot_service=self)
179
180        except Exception as e:
181            log.error(f"Error while processing raw data points: {e}")
182            return error_response_thing(exception=e, fiot_service=self)
183
184
185if __name__ == '__main__':
186    logging.basicConfig(level=logging.DEBUG)
187    MlServingMlflowService.main()

Pytorch Regression Serving Blueprint (WandB)

Note

WandB support might be removed in the future in favor of MLflow.

Pytorch Regression WandB Serving
  1import logging
  2from asyncio import Future
  3
  4from fastiot.core import FastIoTService, reply
  5from fastiot.msg.thing import Thing
  6
  7import torch
  8import numpy as np
  9import pandas as pd
 10
 11from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import \
 12    request_get_processed_data_points_from_raw_data, ok_response_thing, error_response_thing
 13from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import ML_SERVING_SUBJECT
 14from blueprint_dev_v2_services.ml_pytorch_regression.ml_pytorch_regression_service import DemonstratorNeuralNet
 15from src.blueprint_dev_v2.logger.logger import log
 16
 17
 18class MlServingService(FastIoTService):
 19    """
 20    This service is responsible for serving predictions using a PyTorch model.
 21
 22    The model is a simple neural network that takes 15 input features and outputs a single value.
 23
 24    Attributes
 25    ----------
 26    _regression_model : DemonstratorNeuralNet
 27        The regression model to be used for predictions.
 28    _example_raw_payload : dict
 29        An example raw payload to be used for testing the model.
 30
 31    Methods
 32    -------
 33    _start()
 34        Start the service.
 35    _setup_model()
 36        Initialize the regression model.
 37    _load_model_weights_wandb()
 38        Load the model weights from wandb.
 39    _process_raw_data_points(data: list[dict]) -> Future[list[dict]]
 40        Process raw data points.
 41    _get_prediction(raw_datapoints: list[dict]) -> Future[list[list[float]]]
 42        Get predictions for raw data points.
 43    prediction(topic: str, msg: Thing) -> Thing
 44        Serve predictions for raw data points.
 45    """
 46    _regression_model = None
 47
 48    _example_raw_payload = {
 49        'laborant': "TK",
 50        'material_id': "11111111",
 51        'datum': "15.03.2024, 16:14:48",
 52        'rohwert_1_labormessung': 22.64237723051251,
 53        'rohwert_2_labormessung': 0.55194,
 54        'rohwert_3_labormessung': -0.472279,
 55        'aufbereiteter_wert': 0.287696
 56    }
 57
 58    async def _start(self):
 59        """
 60        Runs when the service starts.
 61        """
 62        log.info("MlPytorchRegressionService started")
 63        await self._setup_model()
 64
 65    async def _setup_model(self):
 66        """
 67        Initialize the regression model.
 68
 69        Returns
 70        -------
 71        None
 72        """
 73        log.info("Setting up Demonstrator Regression model")
 74
 75        self._regression_model = DemonstratorNeuralNet(
 76            input_dim=15,
 77            hidden_dim=10,
 78            output_dim=1
 79        )
 80
 81        # Load model weights
 82        await self._load_model_weights_wandb()
 83
 84        # test model with example payload
 85        _ = await self._get_prediction(
 86            raw_datapoints=[
 87                self._example_raw_payload,
 88                self._example_raw_payload,
 89                self._example_raw_payload,
 90            ]
 91        )
 92
 93    async def _load_model_weights_wandb(self):
 94        """
 95        Load the model weights from wandb.
 96
 97        Returns
 98        -------
 99        None
100        """
101        log.info("Loading model weights from wandb")
102
103        import wandb
104
105        # Create a new API object
106        api = wandb.Api()
107
108        # Get all the artifacts of a specific project
109        wandb_config = {
110            "entity": "querry",  # Replace with your username
111            "project": "KIOptipack-dev",  # Replace with your project name
112            "name": "model_4c7eb0ae-2dc6-49f5-a179-605a89",  # Replace with your artifact name
113            "version": "v6",
114            "group": "MVDP-pytorch-regression",
115            "model_type": "pytorch-regression-model",
116        }
117        artifact = api.artifact("querry/KIOptipack-dev/DemonstratorNeuralNet:latest")
118        log.info(f"Downloading model weights for model '{artifact.metadata['model_name']}'")
119        artifact.download()
120
121        model_name = artifact.metadata["model_name"]
122        model_version = artifact.version
123        path = f"./artifacts/DemonstratorNeuralNet:{model_version}/{model_name}"
124        if self._regression_model is None:
125            raise ValueError("Regression model not initialized. Please call _setup_model() first.")
126        self._regression_model.load_state_dict(
127            torch.load(f"./artifacts/DemonstratorNeuralNet:{model_version}/{model_name}"))
128
129    async def _process_raw_data_points(self, data: list[dict]) -> Future[list[dict]]:
130        """
131        Process raw data points.
132
133        Parameters
134        ----------
135        data
136            Raw data points.
137
138        Returns
139        -------
140        Future[list[dict]]
141            Processed data points.
142        """
143        log.info(f"Processing raw data points, received")
144        return await request_get_processed_data_points_from_raw_data(
145            fiot_service=self,
146            data=data,
147        )
148
149    async def _get_prediction(self, raw_datapoints: list[dict]) -> Future[list[list[float]]]:
150        """
151        Get predictions for raw data points.
152
153        Parameters
154        ----------
155        raw_datapoints
156            Raw data points.
157
158        Returns
159        -------
160        Future[list[list[float]]]
161            Predictions for raw data points.
162        """
163        if self._regression_model is None:
164            raise ValueError("Regression model not initialized. Please call _setup_model() first.")
165
166        processed_data = await self._process_raw_data_points(data=raw_datapoints)
167        temp = pd.DataFrame(processed_data)
168        _ = np.array([temp.pop("aufbereiteter_wert")])
169        x_data = temp.to_numpy()
170
171        prediction = self._regression_model(torch.tensor(x_data, dtype=torch.float32))
172        return prediction.tolist()
173
174    @reply(ML_SERVING_SUBJECT)
175    async def prediction(self,  _: str, msg: Thing) -> Thing:
176        """
177        Serve predictions for raw data points.
178
179        Parameters
180        ----------
181        _
182        msg
183            The message containing the raw data points.
184        msg
185            The message containing the raw data points.
186
187        Returns
188        -------
189        Thing
190            The response message.
191        """
192        if not isinstance(msg.value, list):
193            log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
194                      f"but received: {type(msg.value)}")
195            raise ValueError("Payload must be a list of raw data points")
196
197        raw_data_points: list[dict] = msg.value
198
199        try:
200            res:list[list[float]] = await self._get_prediction(raw_datapoints=raw_data_points)
201
202            return ok_response_thing(payload=res, fiot_service=self)
203
204        except Exception as e:
205            log.error(f"Error while processing raw data points: {e}")
206            return error_response_thing(exception=e, fiot_service=self)
207
208
209if __name__ == '__main__':
210    logging.basicConfig(level=logging.DEBUG)
211    MlServingService.main()