Data Processing Blueprints¶
This service is responsible for processing the data. The Data-Processing-Service defines a data processing pipeline that can be used to preprocess the data before it is used for training the model. It pulls the raw data from the database regularly, processes it and stores the processed data back in the database. The service is also used when performing a prediction with the model. In that case the Model-Serving-Service requests the data from the Data-Processing-Service, processes raw data points, that the Model-Serving-Service provides, and returns the processed data to the Model-Serving-Service.
Note
Currently, the data processing service pulls the hole database and processes it. The broker has currently a limit for the maximum message size. FastIoT has a feature in development that allows to split the data processing into multiple messages and merge the results. So in future the maximum message size will not be a problem. However a sufficiently large database could still cause problems. The data will eventually not fit into the memory of the data processing service. Therefore the data processing service will be changed to process the data in chunks in the future.
What are Data Processing Blueprints?¶
Data Processing Blueprints are templates for a Data-Processing-Service. As all services in the FastIoT framework, the Data-Processing-Service is a microservice. It is responsible for processing the data. The Processing pipeline is just example and should be adapted to the specific use case. The Pipeline is a scikit-learn Pipeline. The scikit-learn provides tools to process data as numpy arrays. In the scope of this project, we provide processing steps that work on pandas dataframes. We believe that pandas is a more verbose and convenient way to work with data.
Data Processing Blueprint¶
Data-Processing 1"""
2This file has just been created automatically.
3This is the file where you can write you own service.
4Currently, the is code provides a basic producer and an basic consumer.
5In order for your code to work, you must delete the code that you are not using and add your own application logic.
6"""
7
8import asyncio
9import logging
10import random
11import pandas as pd
12import sklearn
13
14from fastiot.core import FastIoTService, Subject, subscribe, loop, reply
15from fastiot.core.core_uuid import get_uuid
16from fastiot.core.time import get_time_now
17from fastiot.msg.thing import Thing
18from sklearn.pipeline import Pipeline
19
20from blueprint_dev_v2.data_processing_utils.feature_engineering import (
21 Discretisation,
22 OneHotEncodePd,
23 NormalizeCols
24)
25from blueprint_dev_v2.data_processing_utils.data_cleaning import (
26 ColumnDropper,
27 DropIncompleteRow,
28 FillNaNWithMean,
29 FillNaNWithMedian,
30 FillNaNWithValue
31)
32from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import request_get_all_raw_data_points, \
33 request_upsert_many_processed_data_points, ok_response_thing, error_response_thing
34from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT
35from src.blueprint_dev_v2.logger.logger import log
36
37
38class DataProcessingService(FastIoTService):
39 _preprocessor_pipeline: Pipeline = None
40
41 @loop
42 async def process_data_loop(self):
43
44 if self._preprocessor_pipeline is None:
45 log.info("preprocessor pipeline not yet set up. waiting for 5 seconds")
46 return asyncio.sleep(5)
47
48 log.info("processing data loop started")
49 # Fetch raw data from database
50 raw_entries: list[dict] = await request_get_all_raw_data_points(fiot_service=self)
51 # convert to pandas dataframe
52 df = pd.DataFrame(raw_entries)
53 log.info(f"received {len(df)} raw data entries from database. first 5 entries:")
54 print(df.head(n=5))
55 # process data
56 processed_data: pd.DataFrame = self.process_raw_db_data(df=df)
57 print("processed data (first 5 entries:): ")
58 print(processed_data.head(n=5))
59
60 # convert dataframe to list of dicts
61 data_points = processed_data.to_dict(orient="records")
62 # push processed data to db via broker
63 db_service_response: dict = await request_upsert_many_processed_data_points(fiot_service=self, data=data_points)
64 log.info(f"received response from db-service: {db_service_response}")
65
66 return asyncio.sleep(24 * 60 * 60) # 24h
67
68 async def _start(self):
69 log.info("DataProcessingService started")
70 await self.setup_pipeline()
71
72 async def setup_pipeline(self):
73 log.info("setting up pipeline")
74
75 # this is a list of functions that will be applied to the dataframe
76 # it uses sklearn transformers
77 #
78 # NOTE: sklearn transformers are not designed to work with pandas dataframes, but with numpy arrays
79 # we rewrote the transformers to work with pandas dataframes
80 # if you want to use a transformer that is not implemented yet, you can write your own transformer
81 # look into the feature_engineering.py file to see how to write your own transformer
82
83 steps = []
84
85 # the following steps are just examples
86 # feel free to mix and match or add your own
87
88 ################################################################
89 # ___ _ ___ _ _
90 # | \ __ _| |_ __ _ / __| |___ __ _ _ _ (_)_ _ __ _
91 # | |) / _` | _/ _` | | (__| / -_) _` | ' \| | ' \/ _` |
92 # |___/\__,_|\__\__,_| \___|_\___\__,_|_||_|_|_||_\__, |
93 # |___/
94 ################################################################
95
96 # this is a function that drops a column from the dataframe
97 # some columns may be useless for a ml model, so you can drop them here
98 data_clean_op_column_drop = (
99 "DATA_CEANING_Drop_datum_col",
100 ColumnDropper(target=["datum"])
101 )
102 steps.append(data_clean_op_column_drop)
103
104 ################################################################
105
106 # example to drop rows that have NaN values in a specific column
107 data_clean_op_strat_drop = (
108 "DATA_CEANING_Drop_laborant_NaN_Rows",
109 DropIncompleteRow(['laborant'])
110 )
111 steps.append(data_clean_op_strat_drop)
112
113 ################################################################
114
115 # example to fill NaN values with a mean value
116 #
117 # NOTE: the mean value is calculated from the given dataframe
118 # in case the dataframe is a slice of the db, so the mean value is calculated from a subset of the db,
119 # the mean value will be different from the mean value calculated from the whole db
120 #
121 # So, if your db is large, you may want to calculate/query the mean value from the whole db
122 # and then pass it example below that fills NaN values with a specific value
123
124 data_clean_op_start_mean = (
125 "DATA_CEANING_fill_rohwert_1_labormessung_NaN_with_mean",
126 FillNaNWithMean("rohwert_1_labormessung")
127 )
128 steps.append(data_clean_op_start_mean)
129
130 ################################################################
131
132 # example to fill NaN values with a median value
133 # NOTE: the median value is calculated from the given dataframe
134 # in case the dataframe is a slice of the db, so the median value is calculated from a subset of the db,
135 # the median value will be different from the mean value calculated from the whole db
136 #
137 # So, if your db is large, you may want to calculate/query the median value from the whole db
138 # and then pass it example below that fills NaN values with a specific value
139
140 data_clean_op_start_median = (
141 "DATA_CEANING_fill_rohwert_2_labormessung_NaN_with_median",
142 FillNaNWithMedian("rohwert_2_labormessung")
143 )
144 steps.append(data_clean_op_start_median)
145
146 data_clean_op_start_median = (
147 "DATA_CEANING_fill_aufbereiteter_wert_NaN_with_median",
148 FillNaNWithMedian("aufbereiteter_wert")
149 )
150 steps.append(data_clean_op_start_median)
151
152 ################################################################
153
154 # example to fill NaN values with a specific value
155 data_clean_op_start_fill_val = (
156 "DATA_CEANING_fill_rohwert_3_labormessung_NaN_with_0.5",
157 FillNaNWithValue(target="rohwert_3_labormessung", value=0.5)
158 )
159 steps.append(data_clean_op_start_fill_val)
160
161 ################################################################
162 # ___ _ _____ __ _ _
163 # | \ __ _| |_ __ _ |_ _| _ __ _ _ _ ___/ _|___ _ _ _ __ __ _| |_(_)___ _ _
164 # | |) / _` | _/ _` | | || '_/ _` | ' \(_-< _/ _ \ '_| ' \/ _` | _| / _ \ ' \
165 # |___/\__,_|\__\__,_| |_||_| \__,_|_||_/__/_| \___/_| |_|_|_\__,_|\__|_\___/_||_|
166 #
167 ################################################################
168
169 # example to discrete a column with a specific number of bins
170
171 data_transform_discretisation = (
172 "DATA_TRANSFORM_Discretisation_rohwert_1_labormessung",
173 Discretisation(target="rohwert_1_labormessung",
174 bins=5,
175 labels=["very_low", "low", "medium", "high", "very_high"])
176 )
177 steps.append(data_transform_discretisation)
178
179 ################################################################
180
181 # example to one-hot encode a column
182 data_transform_op_ohe_1 = (
183 "Data_Transformation_Auto_One_Hot_Encode_Rohwert_1_Labormessung",
184 OneHotEncodePd(target="rohwert_1_labormessung", prefix="rohwert_1", sep="_")
185 )
186 steps.append(data_transform_op_ohe_1)
187 data_transform_op_ohe_2 = (
188 "DATA_TRANSFORM_Auto_One_Hot_Encode_Material_Id",
189 OneHotEncodePd(
190 target="material_id",
191 prefix="material_id",
192 sep="_",
193 required_columns=[
194 "material_id_00000000",
195 "material_id_11111111",
196 "material_id_22222222",
197 "material_id_33333333"
198 ]
199 )
200 )
201 steps.append(data_transform_op_ohe_2)
202 data_transform_op_ohe_3 = (
203 "DATA_TRANSFORM_Auto_One_Hot_Encode_Laborant",
204 OneHotEncodePd(
205 target="laborant",
206 prefix="laborant",
207 sep="_",
208 required_columns=["laborant_AN", "laborant_HANS", "laborant_SO", "laborant_TK"]
209 )
210 )
211 steps.append(data_transform_op_ohe_3)
212
213 ################################################################
214
215 # example to normalize a column
216 #
217 # NOTE: the min and max values for scaling are calculated from the given dataframe
218 # TODO: add version that queries DB for min and max values
219 data_transform_op_norm_query_1 = (
220 "DATA_TRANSFORM_Normalize_aufbereiteter_wert",
221 NormalizeCols(target="aufbereiteter_wert", feature_range=(0, 1))
222 )
223 steps.append(data_transform_op_norm_query_1)
224
225 data_transform_op_norm_query_2 = (
226 "DATA_TRANSFORM_Normalize_rohwert_2_labormessung",
227 NormalizeCols(target="rohwert_2_labormessung", feature_range=(0, 1))
228 )
229 steps.append(data_transform_op_norm_query_2)
230
231 data_transform_op_norm_query_3 = (
232 "DATA_TRANSFORM_Normalize_rohwert_3_labormessung",
233 NormalizeCols(target="rohwert_3_labormessung", feature_range=(-1, 1))
234 )
235 steps.append(data_transform_op_norm_query_3)
236
237 ################################################################
238
239 preprocessor = Pipeline(steps=steps, verbose=False)
240 self._preprocessor_pipeline = preprocessor
241
242 def process_raw_db_data(self, df: pd.DataFrame) -> pd.DataFrame:
243 log.info(f"processing raw data from database")
244 processed_dataframe = self._preprocessor_pipeline.fit_transform(df)
245 return processed_dataframe
246
247 def process_raw_data_points(self, data: list[dict]) -> list[dict]:
248 log.info(f"processing raw data from database")
249 df = pd.DataFrame(data)
250 processed_dataframe = self._preprocessor_pipeline.fit_transform(df)
251 print(processed_dataframe.to_dict(orient="records"))
252 return processed_dataframe.to_dict(orient="records")
253
254 @reply(DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT)
255 async def process_many_raw_datapoints(self, topic: str, msg: Thing) -> Thing:
256 print("process....")
257 if not isinstance(msg.value, list):
258 log.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
259 f"but received: {type(msg.value)}")
260 raise ValueError("Payload must be a list of raw data points")
261
262 data_points: list[dict] = msg.value
263
264 log.info(f"Received {len(data_points)} raw data points to be processed")
265
266 try:
267 df = pd.DataFrame(data_points)
268 processed_data: pd.DataFrame = self.process_raw_db_data(df=df)
269 data_points = processed_data.to_dict(orient="records")
270
271 return ok_response_thing(payload=data_points, fiot_service=self)
272
273 except Exception as e:
274 log.error(f"Error while processing raw data points: {e}")
275 return error_response_thing(exception=e, fiot_service=self)
276
277
278if __name__ == '__main__':
279 # Change this to reduce verbosity or remove completely to use `FASTIOT_LOG_LEVEL` environment variable to configure
280 # logging.
281 logging.basicConfig(level=logging.DEBUG)
282 DataProcessingService.main()