Database Blueprints¶
What are Database Blueprints?¶
This service is responsible for storing the data and providing it to the other services. It is an abstraction layer between the data and the services that need it. This service is responsible for storing the data and providing it to the other services. It is an abstraction layer between the data and the services that need it. Putting all the DB interactions in one service enables to implement drop in replacements for different databases. So the end user can choose between different databases without changing the other services.
MongoDB Database Blueprint¶
Database MongoDBImportant
You to setup a username and a password for your MongoDB Instance. The service will not work without it even when your MongoDB database does not require authentication.
Note
Provide the following enviorment variables when starting the service:
FASTIOT_MONGO_DB_USERFASTIOT_MONGO_DB_PASSWORDFASTIOT_MONGO_DB_HOSTFASTIOT_MONGO_DB_PORT
When using PyCharm, you can set the environment variables in the run configuration (accessible via the dropdown menu of the green play button). Here is a screenshot of an example configuration:

1import logging
2import uuid
3import os
4
5from fastiot.core import FastIoTService, reply
6from fastiot.db.mongodb_helper_fn import get_mongodb_client_from_env
7from fastiot.msg.thing import Thing
8from pymongo import UpdateOne, MongoClient
9from pymongo.results import InsertManyResult, BulkWriteResult
10
11from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import ok_response_thing, error_response_thing
12from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import (
13 DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT,
14 DB_GET_ALL_RAW_DATA_SUBJECT,
15 DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT,
16 DB_GET_PROCESSED_DATA_COUNT_SUBJECT,
17 DB_GET_PROCESSED_DATA_PAGE_SUBJECT
18)
19
20
21class DatabaseMongoService(FastIoTService):
22 """
23 This is the DatabaseMongoService class.
24
25 This service is responsible for storing and retrieving data from the database.
26 It implements the CRUD operations (create, read, update, delete) and provides these functionalities to other
27 services via the broker.
28
29 Attributes
30 ----------
31 _DB_NAME : str
32 The name of the database.
33 _MONGO_DB : str
34 The name of the MongoDB database.
35 _MONGO_RAW_DATA_COLLECTION : str
36 The name of the MongoDB collection for raw data.
37 _MONGO_PROCESSED_DATA_COLLECTION : str
38 The name of the MongoDB collection for processed data.
39 _mongodb_client : MongoClient
40 The MongoDB client.
41 _db_username : str
42 From environment variables inferred username, used to construct the connection string.
43 _db_password : str
44 From environment variables inferred password for the according user, used to
45 construct the connection string.
46 _db_port : str
47 From environment variables inferred port, used to construct the connection string.
48 _db_host : str
49 From environment variables inferred host, used to construct the connection string.
50 _db : Database
51 The MongoDB database.
52 _raw_data_collection : Collection
53 The MongoDB collection for raw data.
54 _processed_data_collection : Collection
55 The MongoDB collection for processed data.
56
57 Methods
58 -------
59 __init__(**kwargs)
60 Constructs all the necessary attributes for the DatabaseMongoService object.
61 db_save_many_raw_datapoints(topic: str, msg: Thing) -> Thing
62 Saves many raw data points to the database.
63 db_save_upsert_processed_datapoints(topic: str, msg: Thing) -> Thing
64 Upserts many processed data points to the database.
65 get_all_raw_data(topic: str, msg: Thing) -> Thing
66 Gets all raw data from the database.
67 get_processed_data_count(topic: str, msg: Thing) -> Thing
68 Gets the count of processed data points in the database.
69 get_processed_data_page(topic: str, msg: Thing) -> Thing
70 Gets a page of processed data points from the database.
71 """
72
73 _DB_NAME = "mongodb"
74 _MONGO_DB = "KIOptiPackDb"
75 _MONGO_RAW_DATA_COLLECTION = "KIOptiPackRaw"
76 _MONGO_PROCESSED_DATA_COLLECTION = "KIOptiPackProcessed"
77
78 def __init__(self, **kwargs):
79 """
80 Constructs all the necessary attributes for the DatabaseMongoService object.
81
82 Parameters
83 ------------
84 kwargs : dict
85 keyword arguments that are passed to the FastIoTService constructor.
86 """
87 super().__init__(**kwargs)
88 self._db_username = os.environ.get("FASTIOT_MONGO_DB_USERNAME")
89 self._db_password = os.environ.get("FASTIOT_MONGO_DB_PASSWORD")
90 self._db_port = os.environ.get("FASTIOT_MONGO_DB_PORT")
91 self._db_host = os.environ.get("FASTIOT_MONGO_DB_HOST")
92 connection_string = f"mongodb://{self._db_username}:{self._db_password}@{self._db_host}:{self._db_port}/?authMechanism=SCRAM-SHA-1"
93 self._mongodb_client = MongoClient(connection_string)
94 self._db = self._mongodb_client[self._MONGO_DB]
95
96 self._raw_data_collection = self._db[self._MONGO_RAW_DATA_COLLECTION]
97 self._processed_data_collection = self._db[self._MONGO_PROCESSED_DATA_COLLECTION]
98 # self._trained_model_collection = self._db[self._MONGO_TRAINED_MODEL_COLLECTION]
99
100 @reply(DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT)
101 async def db_save_many_raw_datapoints(self, _: str, msg: Thing) -> Thing:
102 """
103 Saves many raw data points to the database.
104
105 Parameters
106 ----------
107 _
108 The topic of the message. This is not used in this method.
109 msg
110 The message that contains the raw data points to be saved to the database.
111 Returns
112 -------
113 Thing
114 A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
115 Acknowledgements contain the number of raw data points that were saved to the database.
116 """
117 if not isinstance(msg.value, list):
118 self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
119 f"but received: {type(msg.value)}")
120 raise ValueError("Payload must be a list of raw data points")
121
122 data_points: list[dict] = msg.value
123 self._logger.info(f"Received {len(data_points)} raw data points to be inserted into mongodb")
124
125 # add uuids to data points
126 for data_point in data_points:
127 data_point["_id"] = str(uuid.uuid4())
128
129 self._logger.info(f"Insering data points into mongodb")
130 res: InsertManyResult = self._raw_data_collection.insert_many(data_points)
131 self._logger.info(f"DB transaction result: {res.acknowledged}")
132
133 self._logger.info(f"Inserted {len(res.inserted_ids)} raw data points into mongodb")
134
135 # feel free to include whatever information you want to return here.
136 db_specific_info = {
137 "acknowledged": True,
138 "db": "MongoDB",
139 }
140
141 # in principle one does not need to return information here.
142 # However, some infos are return here, so that the requesting service can log the information.
143 return ok_response_thing(payload=db_specific_info, fiot_service=self)
144
145 @reply(DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT)
146 async def db_save_upsert_processed_datapoints(self, _: str, msg: Thing) -> Thing:
147 """
148 Upserts many processed data points to the database.
149
150 Parameters
151 ----------
152 _
153 The topic of the message. This is not used in this method.
154 msg
155 The message that contains the processed data points to be upserted to the database.
156
157 Returns
158 -------
159 Thing
160 A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
161 Acknowledgements contain the number of processed data points that were upserted to the database.
162 """
163
164 if not isinstance(msg.value, list):
165 self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
166 f"but received: {type(msg.value)}")
167 raise ValueError("Payload must be a list of processed data points")
168
169 data_points: list[dict] = msg.value
170 self._logger.info(f"Received {len(data_points)} processed data points to be inserted into mongodb")
171
172 self._logger.info(f"Upserting data points into mongodb using bulk wirte")
173 # Prepare a list of UpdateOne operations
174 operations = [
175 UpdateOne(
176 {"_id": data_point["_id"]}, # filter
177 {"$set": data_point}, # update
178 upsert=True # upsert
179 )
180 for data_point in data_points
181 ]
182 try:
183 res: BulkWriteResult = self._processed_data_collection.bulk_write(operations)
184 except Exception as e:
185 self._logger.error(f"Error while upserting processed data points into mongodb: {e}")
186 return error_response_thing(exception=e, fiot_service=self)
187 self._logger.info(f"DB transaction result: {res.acknowledged}")
188
189 # feel free to include whatever information you want to return here.
190 db_specific_info = {
191 "acknowledged": True,
192 "db": "MongoDB",
193 }
194
195 # in principle one does not need to return information here.
196 # However, some infos are return here, so that the requesting service can log the information.
197 return ok_response_thing(payload=db_specific_info, fiot_service=self)
198
199 @reply(DB_GET_ALL_RAW_DATA_SUBJECT)
200 async def get_all_raw_data(self, _: str, __: Thing) -> Thing:
201 """
202 Gets all raw data from the database.
203 Parameters
204 ----------
205 _
206 The topic of the message. This is not used in this method.
207 __
208 The message that contains the request to get all raw data from the database.
209
210 Returns
211 -------
212 Thing
213 A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
214 Acknowledgements contain all the raw data from the database.
215
216 """
217
218 self._logger.info("Received request to get all raw data from mongodb")
219
220 filter_kwargs = {} # empty dict means no filter
221 raw_data_entries = self._raw_data_collection.find()
222 raw_data_entries = [dict(data) for data in raw_data_entries]
223
224 # the native mongo ID is not serializable to json
225 # so we convert it to a string
226 for data in raw_data_entries:
227 data["_id"] = str(data["_id"])
228
229 return ok_response_thing(payload=raw_data_entries, fiot_service=self)
230
231 @reply(DB_GET_PROCESSED_DATA_COUNT_SUBJECT)
232 async def get_processed_data_count(self, _: str, __: Thing) -> Thing:
233 """
234 Gets the count of processed data points in the database.
235 Parameters
236 ----------
237 _
238 The topic of the message. This is not used in this method.
239 __
240 The message that contains the request to get the count of processed data points in the database.
241
242 Returns
243 -------
244 Thing
245 A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
246 Acknowledgements contain the count of processed data points in the database.
247 """
248 self._logger.info("Received request to get the number of processed data points from mongodb")
249
250 try:
251 count = self._processed_data_collection.count_documents({})
252 except Exception as e:
253 self._logger.error(f"Error while counting processed data points in mongodb: {e}")
254 return error_response_thing(exception=e, fiot_service=self)
255
256 return ok_response_thing(payload=count, fiot_service=self)
257
258 @reply(DB_GET_PROCESSED_DATA_PAGE_SUBJECT)
259 async def get_processed_data_page(self, _: str, msg: Thing) -> Thing:
260 """
261 Gets a page of processed data points from the database.
262
263 Parameters
264 ----------
265 _
266 msg
267 The message that contains the request to get a page of processed data points from the database.
268
269 Returns
270 -------
271 Thing
272 A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
273 Acknowledgements contain a page of processed data points from the database.
274 """
275 self._logger.debug(f"Received request to get a page of processed data points from {self._DB_NAME}")
276 default_params = {
277 "page": 0,
278 "page_size": 10,
279 }
280
281 params: dict = msg.value
282
283 # warning if unexpected parameters are present
284 for k in params.keys():
285 if k not in default_params.keys():
286 self._logger.warning(f"Unexpected parameter '{k}' in request. Ignoring it.")
287
288 # merge default and user parameters
289 params = {**default_params, **params}
290
291 # check 'page' and 'page_size' are in the params dict
292 try:
293 if "page" not in params or "page_size" not in params:
294 raise ValueError("params must contain 'page' and 'page_size'")
295 if params["page"] < 0:
296 raise ValueError("page must be >= 0")
297 if params["page_size"] < 0:
298 raise ValueError("page_size must be >= 0")
299
300 page_documents = self._processed_data_collection.find() \
301 .skip(params["page"] * params["page_size"]) \
302 .limit(params["page_size"])
303 res = [dict(doc) for doc in page_documents]
304 # drop the native mongo ID
305 for doc in res:
306 doc.pop("_id", None)
307
308 except Exception as e:
309 self._logger.error(f"Error while counting processed data points in mongodb: {e}")
310 return error_response_thing(exception=e, fiot_service=self)
311
312 return ok_response_thing(payload=res, fiot_service=self)
313
314
315if __name__ == '__main__':
316 logging.basicConfig(level=logging.DEBUG)
317 DatabaseMongoService.main()
MariaDB Database Blueprint¶
Note
Provide the following enviorment variables when starting the service:
FASTIOT_MARIA_DB_USERFASTIOT_MARIA_DB_PASSWORDFASTIOT_MARIA_DB_HOSTFASTIOT_MARIA_DB_PORT
When using PyCharm, you can set the environment variables in the run configuration (accessible via the dropdown menu of the green play button). Here is a screenshot of an example configuration:

1import logging
2import uuid
3
4from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import ok_response_thing, error_response_thing
5from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import (
6 DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT,
7 DB_GET_ALL_RAW_DATA_SUBJECT,
8 DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT,
9 DB_GET_PROCESSED_DATA_COUNT_SUBJECT, DB_GET_PROCESSED_DATA_PAGE_SUBJECT
10)
11
12from fastiot.core import FastIoTService, reply
13from fastiot.db.mariadb_helper_fn import get_mariadb_client_from_env
14from fastiot.msg.thing import Thing
15
16class DatabaseMariaService(FastIoTService):
17 """
18 This service is responsible for handling the database operations for the MariaDB database.
19
20 The service listens to the following subjects:
21 - `DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT`: to save many raw data points
22 - `DB_GET_ALL_RAW_DATA_SUBJECT`: to get all raw data
23 - `DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT`: to save or update many processed data points
24 - `DB_GET_PROCESSED_DATA_COUNT_SUBJECT`: to get the number of processed data points
25 - `DB_GET_PROCESSED_DATA_PAGE_SUBJECT`: to get a page of processed data points
26
27 The service uses the `mariadb_helper_fn.get_mariadb_client_from_env` function to get a MariaDB connection.
28
29 Attributes
30 ----------
31 _DB_NAME : str
32 The name of the database. This is used for logging purposes.
33 _mariadb_connection : mariadb.connection
34 The connection to the MariaDB database.
35
36 Methods
37 -------
38 _stop()
39 Stops the service.
40 setup_schemas_if_not_exists()
41 Creates the database schema and tables if they do not exist.
42 _start()
43 Starts the service.
44 db_save_many_raw_datapoints(topic: str, msg: Thing) -> Thing
45 Saves many raw data points to the database.
46 get_all_raw_data(topic: str, msg: Thing) -> Thing
47 Gets all raw data from the database.
48 db_save_upsert_processed_datapoints(topic: str, msg: Thing) -> Thing
49 Saves or updates many processed data points in the database.
50 get_processed_data_count(topic: str, msg: Thing) -> Thing
51 Gets the number of processed data points from the database.
52 get_processed_data_page(topic: str, msg: Thing) -> Thing
53 Gets a page of processed data points from the database.
54 """
55
56 _DB_NAME = "MariaDB"
57 def __init__(self, **kwargs):
58 """
59 Parameters
60 ----------
61 kwargs
62 Additional keyword arguments to pass to the FastIoTService constructor.
63 """
64 super().__init__(**kwargs)
65 # NOTE: create the database/schema and table in the mariadb database before running this service
66 # you can use the following SQL statement to create the database and table:
67 #
68 # CREATE DATABASE <database_name>;
69 #
70 # replace <database_name> with the name of the database you want to create.
71 self._mariadb_connection = get_mariadb_client_from_env(schema="TestDB")
72
73 async def _stop(self):
74 """
75 Stops the service.
76 """
77 self._logger.info(f"{self._DB_NAME}-Service stopped")
78 self._mariadb_connection.close()
79
80 def setup_schemas_if_not_exists(self):
81 """
82 Creates the database schema and tables if they do not exist.
83
84 Returns
85 -------
86 None
87 """
88 # create table if not exists. table-name: KIOptiPackRaw
89 cursor = self._mariadb_connection.cursor()
90 try:
91 # create table if not exists. table-name: KIOptiPackRaw
92 # NOTE: replace the table name with your own table name
93 _ = cursor.execute("CREATE TABLE IF NOT EXISTS KIOptiPackRaw("
94 "id UUID PRIMARY KEY,"
95 "material_id VARCHAR(36),"
96 "datum VARCHAR(36),"
97 "laborant VARCHAR(16),"
98 "rohwert_1_labormessung FLOAT(12),"
99 "rohwert_2_labormessung FLOAT(12),"
100 "rohwert_3_labormessung FLOAT(12),"
101 "aufbereiteter_wert FLOAT(12));"
102 )
103 self._logger.info(f"{self._DB_NAME}-Table 'KIOptiPackRaw' created successfully")
104
105
106 _ = cursor.execute("CREATE TABLE IF NOT EXISTS KIOptiPackProcessed("
107 "id UUID PRIMARY KEY,"
108 "aufbereiteter_wert FLOAT(12),"
109 "laborant_AN FLOAT(12),"
110 "laborant_HANS FLOAT(12),"
111 "laborant_SO FLOAT(12),"
112 "laborant_TK FLOAT(12),"
113 "material_id_00000000 FLOAT(12),"
114 "material_id_11111111 FLOAT(12),"
115 "material_id_22222222 FLOAT(12),"
116 "material_id_33333333 FLOAT(12),"
117 "rohwert_1_high FLOAT(12),"
118 "rohwert_1_low FLOAT(12),"
119 "rohwert_1_medium FLOAT(12),"
120 "rohwert_1_very_high FLOAT(12),"
121 "rohwert_1_very_low FLOAT(12),"
122 "rohwert_2_labormessung FLOAT(12),"
123 "rohwert_3_labormessung FLOAT(12));"
124 )
125
126 # material_id_00000000
127 # material_id_00000000-0000-0000-0000-000000000000
128 self._logger.info(f"{self._DB_NAME}-Table 'KIOptiPackProcessed' created successfully")
129 except Exception as e:
130 self._logger.error(f"Error while creating {self._DB_NAME}-Table: {e}")
131 raise e
132 finally:
133 cursor.close()
134
135 async def _start(self):
136 """
137 Starts the service.
138 """
139 self._logger.info(f"{self._DB_NAME}-Service started")
140 self.setup_schemas_if_not_exists()
141
142 @reply(DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT)
143 async def db_save_many_raw_datapoints(self, _: str, msg: Thing) -> Thing:
144 """
145 Saves many raw data points to the database.
146
147 Parameters
148 ----------
149 _
150 The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
151 msg
152 The message containing the raw data points to save.
153
154 Returns
155 -------
156 Thing
157 A Thing containing the result of the operation. If the operation was successful, the Thing will contain
158 the number of rows inserted into the database. If the operation was not successful, the Thing will contain
159 an error message.
160 """
161 if not isinstance(msg.value, list):
162 self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
163 f"but received: {type(msg.value)}")
164 raise ValueError("Payload must be a list of raw data points")
165
166 data_points: list[dict] = msg.value
167 self._logger.info(f"Received {len(data_points)} raw data points to be inserted into {self._DB_NAME}")
168
169 # create uuids for each data point
170 for data_point in data_points:
171 data_point["id"] = str(uuid.uuid4())
172
173 self._logger.info(f"Insering data points into {self._DB_NAME}")
174
175 cursor = self._mariadb_connection.cursor()
176
177 try:
178 no_rows = cursor.executemany(
179 "INSERT INTO KIOptiPackRaw (id, material_id, datum, laborant, rohwert_1_labormessung, "
180 "rohwert_2_labormessung, rohwert_3_labormessung, aufbereiteter_wert) "
181 "VALUES (%(id)s,%(material_id)s, %(datum)s, %(laborant)s, %(rohwert_1_labormessung)s, "
182 "%(rohwert_2_labormessung)s, %(rohwert_3_labormessung)s, %(aufbereiteter_wert)s)",
183 data_points
184 )
185 self._mariadb_connection.commit()
186 self._logger.info(f"DB transaction result: {no_rows}")
187 self._logger.info(f"Inserted {no_rows} data points into {self._DB_NAME}")
188
189 # feel free to include whatever information you want to return here.
190 res = {
191 "acknowledged": True,
192 "db": self._DB_NAME,
193 "no_rows": no_rows,
194 }
195
196 # in principle one does not need to return information here.
197 # However, some infos are return here, so that the requesting service can log the information.
198 return ok_response_thing(payload=res, fiot_service=self)
199 except Exception as e:
200 self._logger.error(f"Error while inserting data points into {self._DB_NAME}: {e}")
201 return error_response_thing(exception=e, fiot_service=self)
202 finally:
203 cursor.close()
204
205 @reply(DB_GET_ALL_RAW_DATA_SUBJECT)
206 async def get_all_raw_data(self, _: str, __: Thing) -> Thing:
207 """
208 Gets all raw data from the database.
209
210 Parameters
211 ----------
212 _
213 The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
214 __
215 The message. This is not used in the method, but is required by the `@reply` decorator.
216
217 Returns
218 -------
219 Thing
220 A Thing containing the raw data from the database. If the operation was successful, the Thing will contain
221 the raw data. If the operation was not successful, the Thing will contain an error message.
222 """
223 self._logger.info(f"Received request to get all raw data from {self._DB_NAME}")
224
225 # query all entries from the table KIOptiPackRaw
226 cursor = self._mariadb_connection.cursor()
227 try:
228 cursor.execute("SELECT * FROM KIOptiPackRaw")
229 raw_data_entries = cursor.fetchall()
230 except Exception as e:
231 self._logger.error(f"Error while querying data from {self._DB_NAME}: {e}")
232 return error_response_thing(exception=e, fiot_service=self)
233 finally:
234 cursor.close()
235
236 return ok_response_thing(payload=raw_data_entries, fiot_service=self)
237
238 @reply(DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT)
239 async def db_save_upsert_processed_datapoints(self, _: str, msg: Thing) -> Thing:
240 """
241 Saves or updates many processed data points in the database.
242
243 Parameters
244 ----------
245 _
246 The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
247 msg
248 The message containing the processed data points to save or update.
249
250 Returns
251 -------
252 Thing
253 A Thing containing the result of the operation. If the operation was successful, the Thing will contain
254 the number of rows inserted into the database. If the operation was not successful, the Thing will contain
255 an error message.
256 """
257 if not isinstance(msg.value, list):
258 self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
259 f"but received: {type(msg.value)}")
260 raise ValueError("Payload must be a list of processed data points")
261
262 data_points: list[dict] = msg.value
263 self._logger.info(f"Received {len(data_points)} processed data points to be inserted into {self._DB_NAME}")
264
265 cursor = self._mariadb_connection.cursor()
266 try:
267 res = cursor.executemany(
268 "INSERT INTO KIOptiPackProcessed ("
269 "id, aufbereiteter_wert, laborant_AN, laborant_HANS, laborant_SO, laborant_TK, "
270 "material_id_00000000, material_id_11111111, "
271 "material_id_22222222, material_id_33333333, "
272 "rohwert_1_high, rohwert_1_low, rohwert_1_medium, rohwert_1_very_high, rohwert_1_very_low, "
273 "rohwert_2_labormessung, rohwert_3_labormessung"
274 ") VALUES ("
275 "%(id)s, %(aufbereiteter_wert)s, %(laborant_AN)s, %(laborant_HANS)s, %(laborant_SO)s, %(laborant_TK)s, "
276 "%(material_id_00000000)s, %(material_id_11111111)s, "
277 "%(material_id_22222222)s, %(material_id_33333333)s, "
278 "%(rohwert_1_high)s, %(rohwert_1_low)s, %(rohwert_1_medium)s, %(rohwert_1_very_high)s, %(rohwert_1_very_low)s, "
279 "%(rohwert_2_labormessung)s, %(rohwert_3_labormessung)s"
280 ") ON DUPLICATE KEY UPDATE "
281 "aufbereiteter_wert = VALUES(aufbereiteter_wert),"
282 "laborant_AN = VALUES(laborant_AN),"
283 "laborant_HANS = VALUES(laborant_HANS),"
284 "laborant_SO = VALUES(laborant_SO),"
285 "laborant_TK = VALUES(laborant_TK),"
286 "material_id_00000000 = VALUES(material_id_00000000),"
287 "material_id_11111111 = VALUES(material_id_11111111),"
288 "material_id_22222222 = VALUES(material_id_22222222),"
289 "material_id_33333333 = VALUES(material_id_33333333),"
290 "rohwert_1_high = VALUES(rohwert_1_high),"
291 "rohwert_1_low = VALUES(rohwert_1_low),"
292 "rohwert_1_medium = VALUES(rohwert_1_medium),"
293 "rohwert_1_very_high = VALUES(rohwert_1_very_high),"
294 "rohwert_1_very_low = VALUES(rohwert_1_very_low),"
295 "rohwert_2_labormessung = VALUES(rohwert_2_labormessung),"
296 "rohwert_3_labormessung = VALUES(rohwert_3_labormessung)",
297 data_points
298 )
299 self._mariadb_connection.commit()
300 self._logger.info(f"DB transaction result: {res}")
301 self._logger.info(f"Inserted {res} data points into {self._DB_NAME}")
302 except Exception as e:
303 self._logger.error(f"Error while inserting data points into {self._DB_NAME}: {e}")
304 return error_response_thing(exception=e, fiot_service=self)
305
306 # feel free to include whatever information you want to return here.
307 db_specific_info = {
308 "acknowledged": True,
309 "db": "MariaDB"
310 }
311
312 # in principle one does not need to return information here.
313 # However, some infos are return here, so that the requesting service can log the information.
314 return ok_response_thing(payload=db_specific_info, fiot_service=self)
315
316 @reply(DB_GET_PROCESSED_DATA_COUNT_SUBJECT)
317 async def get_processed_data_count(self, _: str, __: Thing) -> Thing:
318 """
319 Gets the number of processed data points from the database.
320
321 Parameters
322 ----------
323 _
324 The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
325 __
326 The message. This is not used in the method, but is required by the `@reply` decorator.
327
328 Returns
329 -------
330 Thing
331 A Thing containing the number of processed data points from the database. If the operation was successful,
332 the Thing will contain the number of processed data points. If the operation was not successful, the Thing
333 will contain an error message.
334 """
335 self._logger.info(f"Received request to get the number of processed data points from {self._DB_NAME}")
336
337 try:
338 cursor = self._mariadb_connection.cursor()
339 cursor.execute("SELECT COUNT(*) as count FROM KIOptiPackProcessed")
340 result = cursor.fetchone()
341 count = result['count']
342 cursor.close()
343 except Exception as e:
344 self._logger.error(f"Error while counting processed data points in mariadb: {e}")
345 return error_response_thing(exception=e, fiot_service=self)
346
347 return ok_response_thing(payload=count, fiot_service=self)
348
349 @reply(DB_GET_PROCESSED_DATA_PAGE_SUBJECT)
350 async def get_processed_data_page(self, _: str, msg: Thing) -> Thing:
351 """
352 Gets a page of processed data points from the database.
353
354 Parameters
355 ----------
356 _
357 The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
358 msg
359 The message containing the parameters for the page. The message should contain the following fields:
360 - `page`: The page number to get.
361 - `page_size`: The number of items per page.
362
363 Returns
364 -------
365 Thing
366 A Thing containing the page of processed data points from the database. If the operation was successful,
367 the Thing will contain the page of processed data points. If the operation was not successful, the Thing
368 will contain an error message.
369 """
370 self._logger.info(f"Received request to get a page of processed data points from {self._DB_NAME}")
371 default_params = {
372 "page": 0,
373 "page_size": 10,
374 }
375
376 params: dict = msg.value
377
378 # warning if unexpected parameters are present
379 for k in params.keys():
380 if k not in default_params.keys():
381 self._logger.warning(f"Unexpected parameter '{k}' in request. Ignoring it.")
382
383 # merge default and user parameters
384 params = {**default_params, **params}
385
386 # check 'page' and 'page_size' are in the params dict
387 try:
388 if "page" not in params or "page_size" not in params:
389 raise ValueError("params must contain 'page' and 'page_size'")
390 if params["page"] < 0:
391 raise ValueError("page must be >= 0")
392 if params["page_size"] < 0:
393 raise ValueError("page_size must be >= 0")
394
395 page = params["page"]
396 page_size = params["page_size"]
397 offset = page * page_size
398
399 cursor = self._mariadb_connection.cursor()
400 cursor.execute("SELECT * FROM KIOptiPackProcessed LIMIT %s OFFSET %s", (page_size, offset))
401 page_documents = cursor.fetchall()
402 cursor.close()
403
404 # drop id column
405 page_documents = [dict(row) for row in page_documents]
406 for row in page_documents:
407 del row["id"]
408
409 except Exception as e:
410 self._logger.error(f"Error while counting processed data points in {self._DB_NAME}: {e}")
411 return error_response_thing(exception=e, fiot_service=self)
412
413 return ok_response_thing(payload=page_documents, fiot_service=self)
414
415
416if __name__ == '__main__':
417 logging.basicConfig(level=logging.DEBUG)
418 DatabaseMariaService.main()