Database Blueprints

What are Database Blueprints?

This service is responsible for storing the data and providing it to the other services. It is an abstraction layer between the data and the services that need it. This service is responsible for storing the data and providing it to the other services. It is an abstraction layer between the data and the services that need it. Putting all the DB interactions in one service enables to implement drop in replacements for different databases. So the end user can choose between different databases without changing the other services.

MongoDB Database Blueprint

Database MongoDB

Important

You to setup a username and a password for your MongoDB Instance. The service will not work without it even when your MongoDB database does not require authentication.

Note

Provide the following enviorment variables when starting the service:

  • FASTIOT_MONGO_DB_USER

  • FASTIOT_MONGO_DB_PASSWORD

  • FASTIOT_MONGO_DB_HOST

  • FASTIOT_MONGO_DB_PORT

When using PyCharm, you can set the environment variables in the run configuration (accessible via the dropdown menu of the green play button). Here is a screenshot of an example configuration:

Mongo example env config

  1import logging
  2import uuid
  3import os
  4
  5from fastiot.core import FastIoTService, reply
  6from fastiot.db.mongodb_helper_fn import get_mongodb_client_from_env
  7from fastiot.msg.thing import Thing
  8from pymongo import UpdateOne, MongoClient
  9from pymongo.results import InsertManyResult, BulkWriteResult
 10
 11from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import ok_response_thing, error_response_thing
 12from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import (
 13    DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT,
 14    DB_GET_ALL_RAW_DATA_SUBJECT,
 15    DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT,
 16    DB_GET_PROCESSED_DATA_COUNT_SUBJECT,
 17    DB_GET_PROCESSED_DATA_PAGE_SUBJECT
 18)
 19
 20
 21class DatabaseMongoService(FastIoTService):
 22    """
 23        This is the DatabaseMongoService class.
 24
 25        This service is responsible for storing and retrieving data from the database.
 26        It implements the CRUD operations (create, read, update, delete) and provides these functionalities to other
 27        services via the broker.
 28
 29        Attributes
 30        ----------
 31        _DB_NAME : str
 32            The name of the database.
 33        _MONGO_DB : str
 34            The name of the MongoDB database.
 35        _MONGO_RAW_DATA_COLLECTION : str
 36            The name of the MongoDB collection for raw data.
 37        _MONGO_PROCESSED_DATA_COLLECTION : str
 38            The name of the MongoDB collection for processed data.
 39        _mongodb_client : MongoClient
 40            The MongoDB client.
 41        _db_username : str
 42            From environment variables inferred username, used to construct the connection string.
 43        _db_password : str
 44            From environment variables inferred password for the according user, used to
 45            construct the connection string.
 46        _db_port : str
 47            From environment variables inferred port, used to construct the connection string.
 48        _db_host : str
 49            From environment variables inferred host, used to construct the connection string.
 50        _db : Database
 51            The MongoDB database.
 52        _raw_data_collection : Collection
 53            The MongoDB collection for raw data.
 54        _processed_data_collection : Collection
 55            The MongoDB collection for processed data.
 56
 57        Methods
 58        -------
 59        __init__(**kwargs)
 60            Constructs all the necessary attributes for the DatabaseMongoService object.
 61        db_save_many_raw_datapoints(topic: str, msg: Thing) -> Thing
 62            Saves many raw data points to the database.
 63        db_save_upsert_processed_datapoints(topic: str, msg: Thing) -> Thing
 64            Upserts many processed data points to the database.
 65        get_all_raw_data(topic: str, msg: Thing) -> Thing
 66            Gets all raw data from the database.
 67        get_processed_data_count(topic: str, msg: Thing) -> Thing
 68            Gets the count of processed data points in the database.
 69        get_processed_data_page(topic: str, msg: Thing) -> Thing
 70            Gets a page of processed data points from the database.
 71        """
 72
 73    _DB_NAME = "mongodb"
 74    _MONGO_DB = "KIOptiPackDb"
 75    _MONGO_RAW_DATA_COLLECTION = "KIOptiPackRaw"
 76    _MONGO_PROCESSED_DATA_COLLECTION = "KIOptiPackProcessed"
 77
 78    def __init__(self, **kwargs):
 79        """
 80        Constructs all the necessary attributes for the DatabaseMongoService object.
 81
 82        Parameters
 83        ------------
 84        kwargs : dict
 85             keyword arguments that are passed to the FastIoTService constructor.
 86        """
 87        super().__init__(**kwargs)
 88        self._db_username = os.environ.get("FASTIOT_MONGO_DB_USERNAME")
 89        self._db_password = os.environ.get("FASTIOT_MONGO_DB_PASSWORD")
 90        self._db_port = os.environ.get("FASTIOT_MONGO_DB_PORT")
 91        self._db_host = os.environ.get("FASTIOT_MONGO_DB_HOST")
 92        connection_string = f"mongodb://{self._db_username}:{self._db_password}@{self._db_host}:{self._db_port}/?authMechanism=SCRAM-SHA-1"
 93        self._mongodb_client = MongoClient(connection_string)
 94        self._db = self._mongodb_client[self._MONGO_DB]
 95
 96        self._raw_data_collection = self._db[self._MONGO_RAW_DATA_COLLECTION]
 97        self._processed_data_collection = self._db[self._MONGO_PROCESSED_DATA_COLLECTION]
 98        # self._trained_model_collection = self._db[self._MONGO_TRAINED_MODEL_COLLECTION]
 99
100    @reply(DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT)
101    async def db_save_many_raw_datapoints(self, _: str, msg: Thing) -> Thing:
102        """
103        Saves many raw data points to the database.
104
105        Parameters
106        ----------
107        _
108            The topic of the message. This is not used in this method.
109        msg
110            The message that contains the raw data points to be saved to the database.
111        Returns
112        -------
113        Thing
114            A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
115            Acknowledgements contain the number of raw data points that were saved to the database.
116        """
117        if not isinstance(msg.value, list):
118            self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
119                               f"but received: {type(msg.value)}")
120            raise ValueError("Payload must be a list of raw data points")
121
122        data_points: list[dict] = msg.value
123        self._logger.info(f"Received {len(data_points)} raw data points to be inserted into mongodb")
124
125        # add uuids to data points
126        for data_point in data_points:
127            data_point["_id"] = str(uuid.uuid4())
128
129        self._logger.info(f"Insering data points into mongodb")
130        res: InsertManyResult = self._raw_data_collection.insert_many(data_points)
131        self._logger.info(f"DB transaction result: {res.acknowledged}")
132
133        self._logger.info(f"Inserted {len(res.inserted_ids)} raw data points into mongodb")
134
135        # feel free to include whatever information you want to return here.
136        db_specific_info = {
137            "acknowledged": True,
138            "db": "MongoDB",
139        }
140
141        # in principle one does not need to return information here.
142        # However, some infos are return here, so that the requesting service can log the information.
143        return ok_response_thing(payload=db_specific_info, fiot_service=self)
144
145    @reply(DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT)
146    async def db_save_upsert_processed_datapoints(self, _: str, msg: Thing) -> Thing:
147        """
148        Upserts many processed data points to the database.
149
150        Parameters
151        ----------
152        _
153            The topic of the message. This is not used in this method.
154        msg
155            The message that contains the processed data points to be upserted to the database.
156
157        Returns
158        -------
159        Thing
160            A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
161            Acknowledgements contain the number of processed data points that were upserted to the database.
162        """
163
164        if not isinstance(msg.value, list):
165            self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
166                               f"but received: {type(msg.value)}")
167            raise ValueError("Payload must be a list of processed data points")
168
169        data_points: list[dict] = msg.value
170        self._logger.info(f"Received {len(data_points)} processed data points to be inserted into mongodb")
171
172        self._logger.info(f"Upserting data points into mongodb using bulk wirte")
173        # Prepare a list of UpdateOne operations
174        operations = [
175            UpdateOne(
176                {"_id": data_point["_id"]},  # filter
177                {"$set": data_point},  # update
178                upsert=True  # upsert
179            )
180            for data_point in data_points
181        ]
182        try:
183            res: BulkWriteResult = self._processed_data_collection.bulk_write(operations)
184        except Exception as e:
185            self._logger.error(f"Error while upserting processed data points into mongodb: {e}")
186            return error_response_thing(exception=e, fiot_service=self)
187        self._logger.info(f"DB transaction result: {res.acknowledged}")
188
189        # feel free to include whatever information you want to return here.
190        db_specific_info = {
191            "acknowledged": True,
192            "db": "MongoDB",
193        }
194
195        # in principle one does not need to return information here.
196        # However, some infos are return here, so that the requesting service can log the information.
197        return ok_response_thing(payload=db_specific_info, fiot_service=self)
198
199    @reply(DB_GET_ALL_RAW_DATA_SUBJECT)
200    async def get_all_raw_data(self, _: str, __: Thing) -> Thing:
201        """
202        Gets all raw data from the database.
203        Parameters
204        ----------
205        _
206            The topic of the message. This is not used in this method.
207        __
208            The message that contains the request to get all raw data from the database.
209
210        Returns
211        -------
212        Thing
213            A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
214            Acknowledgements contain all the raw data from the database.
215
216        """
217
218        self._logger.info("Received request to get all raw data from mongodb")
219
220        filter_kwargs = {}  # empty dict means no filter
221        raw_data_entries = self._raw_data_collection.find()
222        raw_data_entries = [dict(data) for data in raw_data_entries]
223
224        # the native mongo ID is not serializable to json
225        # so we convert it to a string
226        for data in raw_data_entries:
227            data["_id"] = str(data["_id"])
228
229        return ok_response_thing(payload=raw_data_entries, fiot_service=self)
230
231    @reply(DB_GET_PROCESSED_DATA_COUNT_SUBJECT)
232    async def get_processed_data_count(self, _: str, __: Thing) -> Thing:
233        """
234        Gets the count of processed data points in the database.
235        Parameters
236        ----------
237        _
238            The topic of the message. This is not used in this method.
239        __
240            The message that contains the request to get the count of processed data points in the database.
241
242        Returns
243        -------
244        Thing
245            A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
246            Acknowledgements contain the count of processed data points in the database.
247        """
248        self._logger.info("Received request to get the number of processed data points from mongodb")
249
250        try:
251            count = self._processed_data_collection.count_documents({})
252        except Exception as e:
253            self._logger.error(f"Error while counting processed data points in mongodb: {e}")
254            return error_response_thing(exception=e, fiot_service=self)
255
256        return ok_response_thing(payload=count, fiot_service=self)
257
258    @reply(DB_GET_PROCESSED_DATA_PAGE_SUBJECT)
259    async def get_processed_data_page(self, _: str, msg: Thing) -> Thing:
260        """
261        Gets a page of processed data points from the database.
262
263        Parameters
264        ----------
265        _
266        msg
267            The message that contains the request to get a page of processed data points from the database.
268
269        Returns
270        -------
271        Thing
272            A Thing object that contains the result of the operation. This is either an acknowledgement or an error.
273            Acknowledgements contain a page of processed data points from the database.
274        """
275        self._logger.debug(f"Received request to get a page of processed data points from {self._DB_NAME}")
276        default_params = {
277            "page": 0,
278            "page_size": 10,
279        }
280
281        params: dict = msg.value
282
283        # warning if unexpected parameters are present
284        for k in params.keys():
285            if k not in default_params.keys():
286                self._logger.warning(f"Unexpected parameter '{k}' in request. Ignoring it.")
287
288        # merge default and user parameters
289        params = {**default_params, **params}
290
291        # check 'page' and 'page_size' are in the params dict
292        try:
293            if "page" not in params or "page_size" not in params:
294                raise ValueError("params must contain 'page' and 'page_size'")
295            if params["page"] < 0:
296                raise ValueError("page must be >= 0")
297            if params["page_size"] < 0:
298                raise ValueError("page_size must be >= 0")
299
300            page_documents = self._processed_data_collection.find() \
301                .skip(params["page"] * params["page_size"]) \
302                .limit(params["page_size"])
303            res = [dict(doc) for doc in page_documents]
304            # drop the native mongo ID
305            for doc in res:
306                doc.pop("_id", None)
307
308        except Exception as e:
309            self._logger.error(f"Error while counting processed data points in mongodb: {e}")
310            return error_response_thing(exception=e, fiot_service=self)
311
312        return ok_response_thing(payload=res, fiot_service=self)
313
314
315if __name__ == '__main__':
316    logging.basicConfig(level=logging.DEBUG)
317    DatabaseMongoService.main()

MariaDB Database Blueprint

Note

Provide the following enviorment variables when starting the service:

  • FASTIOT_MARIA_DB_USER

  • FASTIOT_MARIA_DB_PASSWORD

  • FASTIOT_MARIA_DB_HOST

  • FASTIOT_MARIA_DB_PORT

When using PyCharm, you can set the environment variables in the run configuration (accessible via the dropdown menu of the green play button). Here is a screenshot of an example configuration:

Mongo example env config

Database MariaDB
  1import logging
  2import uuid
  3
  4from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_broker_facade import ok_response_thing, error_response_thing
  5from blueprint_dev_v2.ml_lifecycle_utils.ml_lifecycle_subjects_name import (
  6    DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT,
  7    DB_GET_ALL_RAW_DATA_SUBJECT,
  8    DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT,
  9    DB_GET_PROCESSED_DATA_COUNT_SUBJECT, DB_GET_PROCESSED_DATA_PAGE_SUBJECT
 10)
 11
 12from fastiot.core import FastIoTService, reply
 13from fastiot.db.mariadb_helper_fn import get_mariadb_client_from_env
 14from fastiot.msg.thing import Thing
 15
 16class DatabaseMariaService(FastIoTService):
 17    """
 18    This service is responsible for handling the database operations for the MariaDB database.
 19
 20    The service listens to the following subjects:
 21    - `DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT`: to save many raw data points
 22    - `DB_GET_ALL_RAW_DATA_SUBJECT`: to get all raw data
 23    - `DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT`: to save or update many processed data points
 24    - `DB_GET_PROCESSED_DATA_COUNT_SUBJECT`: to get the number of processed data points
 25    - `DB_GET_PROCESSED_DATA_PAGE_SUBJECT`: to get a page of processed data points
 26
 27    The service uses the `mariadb_helper_fn.get_mariadb_client_from_env` function to get a MariaDB connection.
 28
 29    Attributes
 30    ----------
 31    _DB_NAME : str
 32        The name of the database. This is used for logging purposes.
 33    _mariadb_connection : mariadb.connection
 34        The connection to the MariaDB database.
 35
 36    Methods
 37    -------
 38    _stop()
 39        Stops the service.
 40    setup_schemas_if_not_exists()
 41        Creates the database schema and tables if they do not exist.
 42    _start()
 43        Starts the service.
 44    db_save_many_raw_datapoints(topic: str, msg: Thing) -> Thing
 45        Saves many raw data points to the database.
 46    get_all_raw_data(topic: str, msg: Thing) -> Thing
 47        Gets all raw data from the database.
 48    db_save_upsert_processed_datapoints(topic: str, msg: Thing) -> Thing
 49        Saves or updates many processed data points in the database.
 50    get_processed_data_count(topic: str, msg: Thing) -> Thing
 51        Gets the number of processed data points from the database.
 52    get_processed_data_page(topic: str, msg: Thing) -> Thing
 53        Gets a page of processed data points from the database.
 54    """
 55
 56    _DB_NAME = "MariaDB"
 57    def __init__(self, **kwargs):
 58        """
 59        Parameters
 60        ----------
 61        kwargs
 62            Additional keyword arguments to pass to the FastIoTService constructor.
 63        """
 64        super().__init__(**kwargs)
 65        # NOTE: create the database/schema and table in the mariadb database before running this service
 66        #       you can use the following SQL statement to create the database and table:
 67        #
 68        #       CREATE DATABASE <database_name>;
 69        #
 70        #       replace <database_name> with the name of the database you want to create.
 71        self._mariadb_connection = get_mariadb_client_from_env(schema="TestDB")
 72
 73    async def _stop(self):
 74        """
 75        Stops the service.
 76        """
 77        self._logger.info(f"{self._DB_NAME}-Service stopped")
 78        self._mariadb_connection.close()
 79
 80    def setup_schemas_if_not_exists(self):
 81        """
 82        Creates the database schema and tables if they do not exist.
 83
 84        Returns
 85        -------
 86        None
 87        """
 88        # create table if not exists. table-name: KIOptiPackRaw
 89        cursor = self._mariadb_connection.cursor()
 90        try:
 91            # create table if not exists. table-name: KIOptiPackRaw
 92            # NOTE: replace the table name with your own table name
 93            _ = cursor.execute("CREATE TABLE IF NOT EXISTS KIOptiPackRaw("
 94                                              "id UUID PRIMARY KEY,"
 95                                              "material_id VARCHAR(36),"
 96                                              "datum VARCHAR(36),"
 97                                              "laborant VARCHAR(16),"
 98                                              "rohwert_1_labormessung FLOAT(12),"
 99                                              "rohwert_2_labormessung FLOAT(12),"
100                                              "rohwert_3_labormessung FLOAT(12),"
101                                              "aufbereiteter_wert FLOAT(12));"
102                                              )
103            self._logger.info(f"{self._DB_NAME}-Table 'KIOptiPackRaw' created successfully")
104
105
106            _ = cursor.execute("CREATE TABLE IF NOT EXISTS KIOptiPackProcessed("
107                                    "id UUID PRIMARY KEY,"
108                                    "aufbereiteter_wert FLOAT(12),"
109                                    "laborant_AN FLOAT(12),"
110                                    "laborant_HANS FLOAT(12),"
111                                    "laborant_SO FLOAT(12),"
112                                    "laborant_TK FLOAT(12),"
113                                    "material_id_00000000 FLOAT(12),"
114                                    "material_id_11111111 FLOAT(12),"
115                                    "material_id_22222222 FLOAT(12),"
116                                    "material_id_33333333 FLOAT(12),"
117                                    "rohwert_1_high FLOAT(12),"
118                                    "rohwert_1_low FLOAT(12),"
119                                    "rohwert_1_medium FLOAT(12),"
120                                    "rohwert_1_very_high FLOAT(12),"
121                                    "rohwert_1_very_low FLOAT(12),"
122                                    "rohwert_2_labormessung FLOAT(12),"
123                                    "rohwert_3_labormessung FLOAT(12));"
124                                 )
125
126            # material_id_00000000
127            # material_id_00000000-0000-0000-0000-000000000000
128            self._logger.info(f"{self._DB_NAME}-Table 'KIOptiPackProcessed' created successfully")
129        except Exception as e:
130            self._logger.error(f"Error while creating {self._DB_NAME}-Table: {e}")
131            raise e
132        finally:
133            cursor.close()
134
135    async def _start(self):
136        """
137        Starts the service.
138        """
139        self._logger.info(f"{self._DB_NAME}-Service started")
140        self.setup_schemas_if_not_exists()
141
142    @reply(DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT)
143    async def db_save_many_raw_datapoints(self, _: str, msg: Thing) -> Thing:
144        """
145        Saves many raw data points to the database.
146
147        Parameters
148        ----------
149        _
150            The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
151        msg
152            The message containing the raw data points to save.
153
154        Returns
155        -------
156        Thing
157            A Thing containing the result of the operation. If the operation was successful, the Thing will contain
158            the number of rows inserted into the database. If the operation was not successful, the Thing will contain
159            an error message.
160        """
161        if not isinstance(msg.value, list):
162            self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
163                      f"but received: {type(msg.value)}")
164            raise ValueError("Payload must be a list of raw data points")
165
166        data_points: list[dict] = msg.value
167        self._logger.info(f"Received {len(data_points)} raw data points to be inserted into {self._DB_NAME}")
168
169        # create uuids for each data point
170        for data_point in data_points:
171            data_point["id"] = str(uuid.uuid4())
172
173        self._logger.info(f"Insering data points into {self._DB_NAME}")
174
175        cursor = self._mariadb_connection.cursor()
176
177        try:
178            no_rows = cursor.executemany(
179                "INSERT INTO KIOptiPackRaw (id, material_id, datum, laborant, rohwert_1_labormessung, "
180                "rohwert_2_labormessung, rohwert_3_labormessung, aufbereiteter_wert) "
181                "VALUES (%(id)s,%(material_id)s, %(datum)s, %(laborant)s, %(rohwert_1_labormessung)s, "
182                "%(rohwert_2_labormessung)s, %(rohwert_3_labormessung)s, %(aufbereiteter_wert)s)",
183                data_points
184            )
185            self._mariadb_connection.commit()
186            self._logger.info(f"DB transaction result: {no_rows}")
187            self._logger.info(f"Inserted {no_rows} data points into {self._DB_NAME}")
188
189            # feel free to include whatever information you want to return here.
190            res = {
191                "acknowledged": True,
192                "db": self._DB_NAME,
193                "no_rows": no_rows,
194            }
195
196            # in principle one does not need to return information here.
197            # However, some infos are return here, so that the requesting service can log the information.
198            return ok_response_thing(payload=res, fiot_service=self)
199        except Exception as e:
200            self._logger.error(f"Error while inserting data points into {self._DB_NAME}: {e}")
201            return error_response_thing(exception=e, fiot_service=self)
202        finally:
203            cursor.close()
204
205    @reply(DB_GET_ALL_RAW_DATA_SUBJECT)
206    async def get_all_raw_data(self, _: str, __: Thing) -> Thing:
207        """
208        Gets all raw data from the database.
209
210        Parameters
211        ----------
212        _
213            The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
214        __
215            The message. This is not used in the method, but is required by the `@reply` decorator.
216
217        Returns
218        -------
219        Thing
220            A Thing containing the raw data from the database. If the operation was successful, the Thing will contain
221            the raw data. If the operation was not successful, the Thing will contain an error message.
222        """
223        self._logger.info(f"Received request to get all raw data from {self._DB_NAME}")
224
225        # query all entries from the table KIOptiPackRaw
226        cursor = self._mariadb_connection.cursor()
227        try:
228            cursor.execute("SELECT * FROM KIOptiPackRaw")
229            raw_data_entries = cursor.fetchall()
230        except Exception as e:
231            self._logger.error(f"Error while querying data from {self._DB_NAME}: {e}")
232            return error_response_thing(exception=e, fiot_service=self)
233        finally:
234            cursor.close()
235
236        return ok_response_thing(payload=raw_data_entries, fiot_service=self)
237
238    @reply(DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT)
239    async def db_save_upsert_processed_datapoints(self, _: str, msg: Thing) -> Thing:
240        """
241        Saves or updates many processed data points in the database.
242
243        Parameters
244        ----------
245        _
246            The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
247        msg
248            The message containing the processed data points to save or update.
249
250        Returns
251        -------
252        Thing
253            A Thing containing the result of the operation. If the operation was successful, the Thing will contain
254            the number of rows inserted into the database. If the operation was not successful, the Thing will contain
255            an error message.
256        """
257        if not isinstance(msg.value, list):
258            self._logger.error(f"Payload (the 'value' field of the msg Thing) must be of type list, "
259                      f"but received: {type(msg.value)}")
260            raise ValueError("Payload must be a list of processed data points")
261
262        data_points: list[dict] = msg.value
263        self._logger.info(f"Received {len(data_points)} processed data points to be inserted into {self._DB_NAME}")
264
265        cursor = self._mariadb_connection.cursor()
266        try:
267            res = cursor.executemany(
268                "INSERT INTO KIOptiPackProcessed ("
269                "id, aufbereiteter_wert, laborant_AN, laborant_HANS, laborant_SO, laborant_TK, "
270                "material_id_00000000, material_id_11111111, "
271                "material_id_22222222, material_id_33333333, "
272                "rohwert_1_high, rohwert_1_low, rohwert_1_medium, rohwert_1_very_high, rohwert_1_very_low, "
273                "rohwert_2_labormessung, rohwert_3_labormessung"
274                ") VALUES ("
275                "%(id)s, %(aufbereiteter_wert)s, %(laborant_AN)s, %(laborant_HANS)s, %(laborant_SO)s, %(laborant_TK)s, "
276                "%(material_id_00000000)s, %(material_id_11111111)s, "
277                "%(material_id_22222222)s, %(material_id_33333333)s, "
278                "%(rohwert_1_high)s, %(rohwert_1_low)s, %(rohwert_1_medium)s, %(rohwert_1_very_high)s, %(rohwert_1_very_low)s, "
279                "%(rohwert_2_labormessung)s, %(rohwert_3_labormessung)s"
280                ") ON DUPLICATE KEY UPDATE "
281                "aufbereiteter_wert = VALUES(aufbereiteter_wert),"
282                "laborant_AN = VALUES(laborant_AN),"
283                "laborant_HANS = VALUES(laborant_HANS),"
284                "laborant_SO = VALUES(laborant_SO),"
285                "laborant_TK = VALUES(laborant_TK),"
286                "material_id_00000000 = VALUES(material_id_00000000),"
287                "material_id_11111111 = VALUES(material_id_11111111),"
288                "material_id_22222222 = VALUES(material_id_22222222),"
289                "material_id_33333333 = VALUES(material_id_33333333),"
290                "rohwert_1_high = VALUES(rohwert_1_high),"
291                "rohwert_1_low = VALUES(rohwert_1_low),"
292                "rohwert_1_medium = VALUES(rohwert_1_medium),"
293                "rohwert_1_very_high = VALUES(rohwert_1_very_high),"
294                "rohwert_1_very_low = VALUES(rohwert_1_very_low),"
295                "rohwert_2_labormessung = VALUES(rohwert_2_labormessung),"
296                "rohwert_3_labormessung = VALUES(rohwert_3_labormessung)",
297                data_points
298            )
299            self._mariadb_connection.commit()
300            self._logger.info(f"DB transaction result: {res}")
301            self._logger.info(f"Inserted {res} data points into {self._DB_NAME}")
302        except Exception as e:
303            self._logger.error(f"Error while inserting data points into {self._DB_NAME}: {e}")
304            return error_response_thing(exception=e, fiot_service=self)
305
306        # feel free to include whatever information you want to return here.
307        db_specific_info = {
308                "acknowledged": True,
309                "db": "MariaDB"
310        }
311
312        # in principle one does not need to return information here.
313        # However, some infos are return here, so that the requesting service can log the information.
314        return ok_response_thing(payload=db_specific_info, fiot_service=self)
315
316    @reply(DB_GET_PROCESSED_DATA_COUNT_SUBJECT)
317    async def get_processed_data_count(self, _: str, __: Thing) -> Thing:
318        """
319        Gets the number of processed data points from the database.
320
321        Parameters
322        ----------
323        _
324            The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
325        __
326            The message. This is not used in the method, but is required by the `@reply` decorator.
327
328        Returns
329        -------
330        Thing
331            A Thing containing the number of processed data points from the database. If the operation was successful,
332            the Thing will contain the number of processed data points. If the operation was not successful, the Thing
333            will contain an error message.
334        """
335        self._logger.info(f"Received request to get the number of processed data points from {self._DB_NAME}")
336
337        try:
338            cursor = self._mariadb_connection.cursor()
339            cursor.execute("SELECT COUNT(*) as count FROM KIOptiPackProcessed")
340            result = cursor.fetchone()
341            count = result['count']
342            cursor.close()
343        except Exception as e:
344            self._logger.error(f"Error while counting processed data points in mariadb: {e}")
345            return error_response_thing(exception=e, fiot_service=self)
346
347        return ok_response_thing(payload=count, fiot_service=self)
348
349    @reply(DB_GET_PROCESSED_DATA_PAGE_SUBJECT)
350    async def get_processed_data_page(self, _: str, msg: Thing) -> Thing:
351        """
352        Gets a page of processed data points from the database.
353
354        Parameters
355        ----------
356        _
357            The topic of the message. This is not used in the method, but is required by the `@reply` decorator.
358        msg
359            The message containing the parameters for the page. The message should contain the following fields:
360            - `page`: The page number to get.
361            - `page_size`: The number of items per page.
362
363        Returns
364        -------
365        Thing
366            A Thing containing the page of processed data points from the database. If the operation was successful,
367            the Thing will contain the page of processed data points. If the operation was not successful, the Thing
368            will contain an error message.
369        """
370        self._logger.info(f"Received request to get a page of processed data points from {self._DB_NAME}")
371        default_params = {
372            "page": 0,
373            "page_size": 10,
374        }
375
376        params: dict = msg.value
377
378        # warning if unexpected parameters are present
379        for k in params.keys():
380            if k not in default_params.keys():
381                self._logger.warning(f"Unexpected parameter '{k}' in request. Ignoring it.")
382
383        # merge default and user parameters
384        params = {**default_params, **params}
385
386        # check 'page' and 'page_size' are in the params dict
387        try:
388            if "page" not in params or "page_size" not in params:
389                raise ValueError("params must contain 'page' and 'page_size'")
390            if params["page"] < 0:
391                raise ValueError("page must be >= 0")
392            if params["page_size"] < 0:
393                raise ValueError("page_size must be >= 0")
394
395            page = params["page"]
396            page_size = params["page_size"]
397            offset = page * page_size
398
399            cursor = self._mariadb_connection.cursor()
400            cursor.execute("SELECT * FROM KIOptiPackProcessed LIMIT %s OFFSET %s", (page_size, offset))
401            page_documents = cursor.fetchall()
402            cursor.close()
403
404            # drop id column
405            page_documents = [dict(row) for row in page_documents]
406            for row in page_documents:
407                del row["id"]
408
409        except Exception as e:
410            self._logger.error(f"Error while counting processed data points in {self._DB_NAME}: {e}")
411            return error_response_thing(exception=e, fiot_service=self)
412
413        return ok_response_thing(payload=page_documents, fiot_service=self)
414
415
416if __name__ == '__main__':
417    logging.basicConfig(level=logging.DEBUG)
418    DatabaseMariaService.main()