Data Processing Blueprints

This service is responsible for processing the data. The Data-Processing-Service defines a data processing pipeline that can be used to preprocess the data before it is used for training the model. It pulls the raw data from the database regularly, processes it and stores the processed data back in the database. The service is also used when performing a prediction with the model. In that case the Model-Serving-Service requests the data from the Data-Processing-Service, processes raw data points, that the Model-Serving-Service provides, and returns the processed data to the Model-Serving-Service.

Note

Currently, the data processing service pulls the hole database and processes it. The broker has currently a limit for the maximum message size. FastIoT has a feature in development that allows to split the data processing into multiple messages and merge the results. So in future the maximum message size will not be a problem. However a sufficiently large database could still cause problems. The data will eventually not fit into the memory of the data processing service. Therefore the data processing service will be changed to process the data in chunks in the future.

What are Data Processing Blueprints?

Data Processing Blueprints are templates for a Data-Processing-Service. As all services in the FastIoT framework, the Data-Processing-Service is a microservice. It is responsible for processing the data. The Processing pipeline is just example and should be adapted to the specific use case. The Pipeline is a scikit-learn Pipeline. The scikit-learn provides tools to process data as numpy arrays. In the scope of this project, we provide processing steps that work on pandas dataframes. We believe that pandas is a more verbose and convenient way to work with data.

Data Processing Blueprint

Data-Processing
  1"""
  2This file has just been created automatically.
  3This is the file where you can write you own service.
  4Currently, the is code provides a basic producer and an basic consumer.
  5In order for your code to work, you must delete the code that you are not using and add your own application logic.
  6"""
  7
  8import asyncio
  9import logging
 10import random
 11import pandas as pd
 12import sklearn
 13
 14from fastiot.core import FastIoTService, Subject, subscribe, loop, reply
 15from fastiot.core.core_uuid import get_uuid
 16from fastiot.core.time import get_time_now
 17from fastiot.msg.thing import Thing
 18from sklearn.pipeline import Pipeline
 19
 20from blueprint_dev_v2.data_processing_utils.feature_engineering import (
 21    Discretisation,
 22    OneHotEncodePd,
 23    NormalizeCols
 24)
 25from blueprint_dev_v2.data_processing_utils.data_cleaning import (
 26    ColumnDropper,
 27    DropIncompleteRow,
 28    FillNaNWithMean,
 29    FillNaNWithMedian,
 30    FillNaNWithValue
 31)
 32from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import request_get_all_raw_data_points, \
 33    request_upsert_many_processed_data_points, ok_response_thing, error_response_thing
 34from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT
 35from src.blueprint_dev_v2.logger.logger import log
 36
 37
 38class DataProcessingService(FastIoTService):
 39    _preprocessor_pipeline: Pipeline = None
 40
 41    @loop
 42    async def process_data_loop(self):
 43
 44        if self._preprocessor_pipeline is None:
 45            log.info("preprocessor pipeline not yet set up. waiting for 5 seconds")
 46            return asyncio.sleep(5)
 47
 48        log.info("processing data loop started")
 49        # Fetch raw data from database
 50        raw_entries: list[dict] = await request_get_all_raw_data_points(fiot_service=self)
 51        # convert to pandas dataframe
 52        df = pd.DataFrame(raw_entries)
 53        log.info(f"received {len(df)} raw data entries from database. first 5 entries:")
 54        print(df.head(n=5))
 55        # process data
 56        processed_data: pd.DataFrame = self.process_raw_db_data(df=df)
 57        print("processed data (first 5 entries:): ")
 58        print(processed_data.head(n=5))
 59
 60        # convert dataframe to list of dicts
 61        data_points = processed_data.to_dict(orient="records")
 62        # push processed data to db via broker
 63        db_service_response: dict = await request_upsert_many_processed_data_points(fiot_service=self, data=data_points)
 64        log.info(f"received response from db-service: {db_service_response}")
 65
 66        return asyncio.sleep(24 * 60 * 60)  # 24h
 67
 68    async def _start(self):
 69        log.info("DataProcessingService started")
 70        await self.setup_pipeline()
 71
 72    async def setup_pipeline(self):
 73        log.info("setting up pipeline")
 74
 75        # this is a list of functions that will be applied to the dataframe
 76        # it uses sklearn transformers
 77        #
 78        # NOTE: sklearn transformers are not designed to work with pandas dataframes, but with numpy arrays
 79        #       we rewrote the transformers to work with pandas dataframes
 80        #       if you want to use a transformer that is not implemented yet, you can write your own transformer
 81        #       look into the feature_engineering.py file to see how to write your own transformer
 82
 83        steps = []
 84
 85        # the following steps are just examples
 86        # feel free to mix and match or add your own
 87
 88        ################################################################
 89        #   ___       _           ___ _               _
 90        #  |   \ __ _| |_ __ _   / __| |___ __ _ _ _ (_)_ _  __ _
 91        #  | |) / _` |  _/ _` | | (__| / -_) _` | ' \| | ' \/ _` |
 92        #  |___/\__,_|\__\__,_|  \___|_\___\__,_|_||_|_|_||_\__, |
 93        #                                                   |___/
 94        ################################################################
 95
 96        # this is a function that drops a column from the dataframe
 97        # some columns may be useless for a ml model, so you can drop them here
 98        data_clean_op_column_drop = (
 99            "DATA_CEANING_Drop_datum_col",
100            ColumnDropper(target=["datum"])
101        )
102        steps.append(data_clean_op_column_drop)
103
104        ################################################################
105
106        # example to drop rows that have NaN values in a specific column
107        data_clean_op_strat_drop = (
108            "DATA_CEANING_Drop_laborant_NaN_Rows",
109            DropIncompleteRow(['laborant'])
110        )
111        steps.append(data_clean_op_strat_drop)
112
113        ################################################################
114
115        # example to fill NaN values with a mean value
116        #
117        # NOTE: the mean value is calculated from the given dataframe
118        #       in case the dataframe is a slice of the db, so the mean value is calculated from a subset of the db,
119        #       the mean value will be different from the mean value calculated from the whole db
120        #
121        #       So, if your db is large, you may want to calculate/query the mean value from the whole db
122        #       and then pass it example below that fills NaN values with a specific value
123
124        data_clean_op_start_mean = (
125            "DATA_CEANING_fill_rohwert_1_labormessung_NaN_with_mean",
126            FillNaNWithMean("rohwert_1_labormessung")
127        )
128        steps.append(data_clean_op_start_mean)
129
130        ################################################################
131
132        # example to fill NaN values with a median value
133        # NOTE: the median value is calculated from the given dataframe
134        #       in case the dataframe is a slice of the db, so the median value is calculated from a subset of the db,
135        #       the median value will be different from the mean value calculated from the whole db
136        #
137        #       So, if your db is large, you may want to calculate/query the median value from the whole db
138        #       and then pass it example below that fills NaN values with a specific value
139
140        data_clean_op_start_median = (
141            "DATA_CEANING_fill_rohwert_2_labormessung_NaN_with_median",
142            FillNaNWithMedian("rohwert_2_labormessung")
143        )
144        steps.append(data_clean_op_start_median)
145
146        data_clean_op_start_median = (
147            "DATA_CEANING_fill_aufbereiteter_wert_NaN_with_median",
148            FillNaNWithMedian("aufbereiteter_wert")
149        )
150        steps.append(data_clean_op_start_median)
151
152        ################################################################
153
154        # example to fill NaN values with a specific value
155        data_clean_op_start_fill_val = (
156            "DATA_CEANING_fill_rohwert_3_labormessung_NaN_with_0.5",
157            FillNaNWithValue(target="rohwert_3_labormessung", value=0.5)
158        )
159        steps.append(data_clean_op_start_fill_val)
160
161        ################################################################
162        #   ___       _          _____                  __                    _   _
163        #  |   \ __ _| |_ __ _  |_   _| _ __ _ _ _  ___/ _|___ _ _ _ __  __ _| |_(_)___ _ _
164        #  | |) / _` |  _/ _` |   | || '_/ _` | ' \(_-<  _/ _ \ '_| '  \/ _` |  _| / _ \ ' \
165        #  |___/\__,_|\__\__,_|   |_||_| \__,_|_||_/__/_| \___/_| |_|_|_\__,_|\__|_\___/_||_|
166        #
167        ################################################################
168
169        # example to discrete a column with a specific number of bins
170
171        data_transform_discretisation = (
172            "DATA_TRANSFORM_Discretisation_rohwert_1_labormessung",
173            Discretisation(target="rohwert_1_labormessung",
174                           bins=5,
175                           labels=["very_low", "low", "medium", "high", "very_high"])
176        )
177        steps.append(data_transform_discretisation)
178
179        ################################################################
180
181        # example to one-hot encode a column
182        data_transform_op_ohe_1 = (
183            "Data_Transformation_Auto_One_Hot_Encode_Rohwert_1_Labormessung",
184            OneHotEncodePd(target="rohwert_1_labormessung", prefix="rohwert_1", sep="_")
185        )
186        steps.append(data_transform_op_ohe_1)
187        data_transform_op_ohe_2 = (
188            "DATA_TRANSFORM_Auto_One_Hot_Encode_Material_Id",
189            OneHotEncodePd(
190                target="material_id",
191                prefix="material_id",
192                sep="_",
193                required_columns=[
194                    "material_id_00000000",
195                    "material_id_11111111",
196                    "material_id_22222222",
197                    "material_id_33333333"
198                ]
199            )
200        )
201        steps.append(data_transform_op_ohe_2)
202        data_transform_op_ohe_3 = (
203            "DATA_TRANSFORM_Auto_One_Hot_Encode_Laborant",
204            OneHotEncodePd(
205                target="laborant",
206                prefix="laborant",
207                sep="_",
208                required_columns=["laborant_AN", "laborant_HANS", "laborant_SO", "laborant_TK"]
209            )
210        )
211        steps.append(data_transform_op_ohe_3)
212
213        ################################################################
214
215        # example to normalize a column
216        #
217        # NOTE: the min and max values for scaling are calculated from the given dataframe
218        # TODO: add version that queries DB for min and max values
219        data_transform_op_norm_query_1 = (
220            "DATA_TRANSFORM_Normalize_aufbereiteter_wert",
221            NormalizeCols(target="aufbereiteter_wert", feature_range=(0, 1))
222        )
223        steps.append(data_transform_op_norm_query_1)
224
225        data_transform_op_norm_query_2 = (
226            "DATA_TRANSFORM_Normalize_rohwert_2_labormessung",
227            NormalizeCols(target="rohwert_2_labormessung", feature_range=(0, 1))
228        )
229        steps.append(data_transform_op_norm_query_2)
230
231        data_transform_op_norm_query_3 = (
232            "DATA_TRANSFORM_Normalize_rohwert_3_labormessung",
233            NormalizeCols(target="rohwert_3_labormessung", feature_range=(-1, 1))
234        )
235        steps.append(data_transform_op_norm_query_3)
236
237        ################################################################
238
239        preprocessor = Pipeline(steps=steps, verbose=False)
240        self._preprocessor_pipeline = preprocessor
241
242    def process_raw_db_data(self, df: pd.DataFrame) -> pd.DataFrame:
243        log.info(f"processing raw data from database")
244        processed_dataframe = self._preprocessor_pipeline.fit_transform(df)
245        return processed_dataframe
246
247    def process_raw_data_points(self, data: list[dict]) -> list[dict]:
248        log.info(f"processing raw data from database")
249        df = pd.DataFrame(data)
250        processed_dataframe = self._preprocessor_pipeline.fit_transform(df)
251        print(processed_dataframe.to_dict(orient="records"))
252        return processed_dataframe.to_dict(orient="records")
253
254    @reply(DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT)
255    async def process_many_raw_datapoints(self, topic: str, msg: Thing) -> Thing:
256        print("process....")
257        if not isinstance(msg.value, list):
258            log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
259                      f"but received: {type(msg.value)}")
260            raise ValueError("Payload must be a list of raw data points")
261
262        data_points: list[dict] = msg.value
263
264        log.info(f"Received {len(data_points)} raw data points to be processed")
265
266        try:
267            df = pd.DataFrame(data_points)
268            processed_data: pd.DataFrame = self.process_raw_db_data(df=df)
269            data_points = processed_data.to_dict(orient="records")
270
271            return ok_response_thing(payload=data_points, fiot_service=self)
272
273        except Exception as e:
274            log.error(f"Error while processing raw data points: {e}")
275            return error_response_thing(exception=e, fiot_service=self)
276
277
278if __name__ == '__main__':
279    # Change this to reduce verbosity or remove completely to use `FASTIOT_LOG_LEVEL` environment variable to configure
280    # logging.
281    logging.basicConfig(level=logging.DEBUG)
282    DataProcessingService.main()