+import json
+
+import sys
+import time
+# from datetime import datetime
+import datetime
+
+import mysql.connector
+from mysql.connector import pooling
+import numpy as np
+import logging
+import pickle
+import pandas as pd
+import csv
+import random
+import timestamp as timestamp
+
+import encoding_module
+import engineering_module
+import ddl_build_module
+
+UNDER_SCORE = "_"
+ENCODED = "encoded"
+FE = "engineered" # feature engineered
+REDUCED = "reduced"
+FOR_EVAULATED = "for_evaulated"
+SIMULATED_REAL = "simulated_real"
+DOESNT_EXIST = "doesnt_exist"
+OTHER_APPLICABLE_ENCODING_TYPES = "other_applicable_encoding_types"
+# Train task status:
+STORED = "STORED"
+IN_PROGRESS = "IN_PROGRESS"
+
+
+class IllegalArgumentException(Exception):
+ pass
+
+
+class TrainTaskExistYetException(Exception):
+ pass
+
+
+class ForeignKeyConstraintViolationException(Exception):
+ pass
+
+
+class NoneException(Exception):
+ pass
+
+
+class NumpyMySQLConverter(mysql.connector.conversion.MySQLConverter):
+ """ A mysql.connector Converter that handles Numpy types """
+
+ def _float32_to_mysql(self, value):
+ return float(value)
+
+ def _float64_to_mysql(self, value):
+ return float(value)
+
+ def _int32_to_mysql(self, value):
+ return int(value)
+
+ def _int64_to_mysql(self, value):
+ return int(value)
+
+
+class Handler:
+ def __init__(self, database_url, database_user, database_password):
+ self.logger = logging.getLogger("train.server.database")
+ self.database_url = database_url
+ self.database_user = database_user
+ self.database_password = database_password
+ # self.connection = self.get_connection(self.database_url, self.database_user, self.database_password)
+ self.connection_pool = self.get_connection_pool(self.database_url, self.database_user, self.database_password)
+
+ def get_connection_pool(self, database_url, database_user_name, database_password):
+ try:
+ connection_pool = pooling.MySQLConnectionPool(pool_name="train_pool",
+ pool_size=8,
+ pool_reset_session=True,
+ host=database_url,
+ user=database_user_name,
+ password=database_password)
+ self.logger.debug("database connection pool created in train modul")
+ return connection_pool
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL connection error: {err.msg}")
+
+ # def get_connection(self, database_url, database_user_name, database_password):
+ # try:
+ # connection = mysql.connector.connect(
+ # # pool_name="local",
+ # # pool_size=16,
+ # host=database_url,
+ # user=database_user_name,
+ # password=database_password)
+ # self.logger.debug("database connection created in train modul")
+ # connection.set_converter_class(NumpyMySQLConverter)
+ # return connection
+ # except mysql.connector.Error as err:
+ # self.logger.error(f"MySQL connection error: {err.msg}")
+
+ def create_common_fraud_schemas(self):
+
+ commands = ["SQL CREATE SCHEMA common_fraud.txt",
+ "SQL CREATE TABLE planned_encoding_and_feature_engineering.txt",
+ "SQL CREATE TABLE raw_dataset.txt",
+ "SQL CREATE TABLE encoding.txt",
+ "SQL CREATE TABLE feature_engineering.txt",
+ "SQL CREATE TABLE encoded_table_registry.txt",
+ "SQL CREATE TABLE feature_engineered_table_registry.txt",
+ "SQL CREATE TABLE train_task.txt",
+ "SQL CREATE TABLE label_encoder.txt",
+ "SQL CREATE TABLE estimator.txt",
+ "SQL CREATE TABLE metrics.txt"
+ ]
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ for command in commands:
+ try:
+ file = open(command, "r")
+ sql_create_script = file.read()
+ cursor.execute(sql_create_script)
+ connection_object.commit()
+ except FileNotFoundError:
+ self.logger.error(f"SQL script file not found {command}")
+ except OSError:
+ self.logger.error("OS Error")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}, file: {file}")
+ cursor.close()
+ connection_object.close()
+
+ def upload_train_database(self, request_parameter):
+ schema_name = request_parameter.get("schema_name")
+ if schema_name is None:
+ raise IllegalArgumentException({"message": "Parameter is missing in request body",
+ "parameter_name": "schema_name"})
+ table_name = request_parameter.get("table_name")
+ if table_name is None:
+ raise IllegalArgumentException({"message": "Parameter is missing in request body",
+ "parameter_name": "table_name"})
+ field_type_by_name = request_parameter.get("fields")
+ if field_type_by_name is None:
+ raise IllegalArgumentException({"message": "Parameter is missing in request body",
+ "parameter_name": "fields"})
+ if type(field_type_by_name) is not dict:
+ raise IllegalArgumentException({"message": "The parameter isn't dictionary",
+ "parameter_name": "fields"})
+ path = request_parameter.get("path")
+ if path is None:
+ raise IllegalArgumentException({"message": "Parameter is missing in request body",
+ "parameter_name": "path"})
+ csv_file_name = request_parameter.get("csv_file_name")
+ if csv_file_name is None:
+ raise IllegalArgumentException({"message": "Parameter is missing in request body",
+ "parameter_name": "csv_file_name"})
+ csv_filename_with_path = path + "/" + csv_file_name
+ try:
+ data = pd.read_csv(csv_filename_with_path, delimiter=';', na_filter=None)
+ df = pd.DataFrame(data)
+ train_as_array = df.to_numpy()
+ train_to_persist = train_as_array[1:, ]
+ except FileNotFoundError:
+ raise IllegalArgumentException({"message": "CSV file not found"})
+ except OSError:
+ raise IllegalArgumentException({"message": "OS error occured"})
+ self.create_train_schema(schema_name)
+ self.create_train_table(schema_name, table_name, field_type_by_name, True)
+ ddl_command_builder = ddl_build_module.DdlCommandBuilder()
+ insert_script = ddl_command_builder.create_insert_into_generic_train_database_script(schema_name, table_name,
+ field_type_by_name)
+ self.persist_encoded_or_engineered_dataset(insert_script, train_to_persist, "train")
+
+ def create_train_schema(self, schema_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = f"CREATE SCHEMA IF NOT EXISTS {schema_name}"
+ try:
+ cursor.execute(sql_insert_command)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"Train schema created, schema name: {schema_name}")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def create_train_table(self, schema_name, table_name, field_type_by_name, is_id_auto_incremented):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ ddl_command_builder = ddl_build_module.DdlCommandBuilder()
+ sql_insert_command = ddl_command_builder.build_create_generic_train_database_script(schema_name,
+ table_name,
+ field_type_by_name,
+ is_id_auto_incremented)
+ try:
+ cursor.execute(sql_insert_command)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"Train table created, schema name: {schema_name}, table name: {table_name}")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_schema_names(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SHOW SCHEMAS"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ schema_names = list()
+ for item in query_result:
+ schema_names.append(item[0])
+ self.logger.debug(f"schema names: {schema_names}")
+ return schema_names
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_table_names_of_given_database(self, schema_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT distinct TABLE_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}'"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ table_names = list()
+ for item in query_result:
+ table_names.append(item[0])
+ self.logger.debug(f"tables names of {schema_name} schema: {table_names}")
+ return table_names
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_records(self, database_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ result = dict()
+ sql_select_query = f"SELECT * FROM {database_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ for record in query_result:
+ fields = list()
+ length = len(record)
+ id = 0
+ for index in range(length):
+ if index == 0:
+ id = record[index]
+ else:
+ fields.append(record[index])
+ result[id] = fields
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_records_with_limit_and_offset(self, database_name, table_name, limit, offset):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT * FROM {database_name}.{table_name}" + (
+ f" limit {limit}" if limit is not None else "") + (f" offset {offset}" if offset is not None else "")
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ records = list()
+ for item in query_result:
+ fields = list()
+ length = len(item)
+ for index in range(length):
+ field = item[index]
+ type_of_field = type(field)
+ if type_of_field == datetime:
+ field = datetime.strftime(field, '%Y-%m-%d %H:%M:%S:%f')
+ fields.append(field)
+ records.append(fields)
+ return records
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_detailed_information_about_table(self, schema_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ result = dict()
+ column_properties = list()
+ if table_name is None:
+ self.logger.debug("No table name set, query the transaction table")
+ table_name = "transaction"
+ sql_select_query = f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' ORDER BY ordinal_position"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ if query_result:
+ for column_name_and_type in query_result:
+ column_property = dict()
+ column_name = column_name_and_type[0]
+ column_type_as_byte = column_name_and_type[1]
+ column_type = column_type_as_byte.decode("utf-8")
+ column_property["name"] = column_name
+ column_property["type"] = column_type
+ column_properties.append(column_property)
+ result["fields"] = column_properties
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ sql_select_query = f"SELECT COUNT(*) FROM {schema_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchone()
+ result["record_number"] = query_result[0] if query_result else 0
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ sql_select_query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' AND COLUMN_KEY='PRI'"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchone()
+ result["primary_key"] = query_result[0] if query_result else ""
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ cursor.close()
+ connection_object.close()
+ fraud_candidates = self.get_fraud_candidate_column_and_fraud_number(schema_name, table_name,
+ result)
+ result["fraud_candidates"] = fraud_candidates
+ self.get_other_proper_column_data_type_to_encoding(schema_name, table_name, result)
+ return result
+
+ def get_fraud_candidate_column_and_fraud_number(self, schema_name, table_name, database_property_holder):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ candidate_fraud_columns = list()
+ column_names_and_types = database_property_holder.get("fields")
+ for column_name_and_type in column_names_and_types:
+ column_name = column_name_and_type.get("name")
+ sql_select_query = f"SELECT {column_name} FROM {schema_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ candidate_fraud_column = dict()
+ record_number = len(query_result)
+ number_of_ones = 0
+ number_of_nulls = 0
+ for field in query_result:
+ if field[0] == 1 or field[0] == '1':
+ number_of_ones = number_of_ones + 1
+ elif field[0] == 0 or field[0] == '0':
+ number_of_nulls = number_of_nulls + 1
+ if number_of_ones + number_of_nulls == record_number and number_of_ones != 0 and number_of_nulls != 0:
+ candidate_fraud_column["name"] = column_name
+ candidate_fraud_column["fraud_number"] = number_of_ones
+ candidate_fraud_column["no_fraud_number"] = number_of_nulls
+ candidate_fraud_columns.append(candidate_fraud_column)
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ cursor.close()
+ connection_object.close()
+ return candidate_fraud_columns
+
+ def get_other_proper_column_data_type_to_encoding(self, schema_name, table_name, database_property_holder):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ column_names_and_types = database_property_holder.get("fields")
+ for column_name_and_type in column_names_and_types:
+ column_name = column_name_and_type.get("name")
+ column_type = column_name_and_type.get("type")
+ other_applicable_encoding_types = list()
+ if column_type.startswith("varchar"):
+ other_applicable_encoding_types.clear()
+ sql_select_query = f"SELECT {column_name} FROM {schema_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ records = cursor.fetchall()
+ is_records_number_type = True
+ is_records_suited_int_type = True
+ for record_as_tuple in records:
+ record = record_as_tuple[0]
+ if not record.isnumeric():
+ is_records_number_type = False
+ is_records_suited_int_type = False
+ break
+ elif self.is_int(record):
+ value = int(record)
+ if value > 2147483647:
+ is_records_suited_int_type = False
+ else:
+ continue
+ elif self.is_float(record):
+ is_records_suited_int_type = False
+
+ if is_records_number_type:
+ other_applicable_encoding_types.append("float")
+ if is_records_suited_int_type:
+ other_applicable_encoding_types.append("int")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ column_name_and_type[OTHER_APPLICABLE_ENCODING_TYPES] = other_applicable_encoding_types
+ cursor.close()
+ connection_object.close()
+
+ def is_int(self, element):
+ try:
+ int(element)
+ return True
+ except ValueError:
+ return False
+
+ def is_float(self, element):
+ try:
+ float(element)
+ return True
+ except ValueError:
+ return False
+
+ def persist_encoding_and_engineering_plan(self, schema_name, table_name, detailed_information_about_table,
+ used_time_base_field_name,
+ encoding_parameters,
+ feature_engineering_parameters):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_query = "INSERT INTO common_fraud.planned_encoding_and_feature_engineering (schema_name, table_name, detailed_information_about_table, time_base_field_name ,encoding_parameters, feature_engineering_parameters) VALUES (%s,%s,%s,%s,%s,%s)"
+ parameter = (
+ schema_name, table_name, json.dumps(detailed_information_about_table), used_time_base_field_name,
+ json.dumps(encoding_parameters),
+ json.dumps(feature_engineering_parameters))
+ try:
+ cursor.execute(sql_insert_query, parameter)
+ last_inserted_row_id = cursor.lastrowid
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(
+ f"encoding and feature enginnering plan persisted, schema name: {schema_name}, table name: {table_name}, encoding parameters: {encoding_parameters}, feature engineering parameters {feature_engineering_parameters}")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ return last_inserted_row_id
+
+ def get_all_encoding_and_feature_engineering_plan(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ response = dict()
+ sql_select_query = "SELECT * FROM common_fraud.planned_encoding_and_feature_engineering"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ response = None
+ if query_result is not None:
+ response = self.build_encoding_and_feature_engineering_plan_response(query_result)
+ return response
+
+ # 2.1
+ def get_encoding_and_feature_engineering_plan_by_id(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.planned_encoding_and_feature_engineering WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ if query_result is None:
+ raise NoneException({"message": "The given id doesn't exist", "parameter": id})
+ response = self.build_encoding_and_feature_engineering_plan_response(query_result)
+ return response
+
+ def delete_encoding_and_feature_engineering_plan(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_delete_command = "DELETE FROM common_fraud.planned_encoding_and_feature_engineering WHERE id= %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_delete_command, parameter)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def build_encoding_and_feature_engineering_plan_response(self, query_result):
+ response = list()
+ for record in query_result:
+ single_response = dict()
+ single_response["id"] = record[0]
+ single_response["schema_name"] = record[1]
+ single_response["table_name"] = record[2]
+ single_response["detailed_information_about_table"] = json.loads(record[3])
+ single_response["time_base_field_name"] = record[4]
+ single_response["encoding_parameters"] = json.loads(record[5])
+ single_response["feature_engineering_parameters"] = json.loads(record[6])
+ response.append(single_response)
+ return response
+
+ def persist_train_task(self, request_parameter, feature_selector_keys, samplers_keys, scalers_keys, models_keys):
+ unique_field_name = request_parameter.get("unique_field_name")
+ if unique_field_name is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body",
+ "parameter_name": "transaction_identifier_field_name"})
+ name = request_parameter.get("train_task_name")
+ if name is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body",
+ "parameter_name": {name}})
+ planned_encoding_and_feature_engineering_id = request_parameter.get(
+ "planned_encoding_and_feature_engineering_id")
+ if planned_encoding_and_feature_engineering_id is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body",
+ "parameter_name": "planned_encoding_and_feature_engineering_id"})
+ existing_planned_encoding_id_and_feature_engineering_id = self.get_existing_planned_encoding_and_feature_engineering_id()
+ if planned_encoding_and_feature_engineering_id not in existing_planned_encoding_id_and_feature_engineering_id:
+ raise IllegalArgumentException(
+ {"message": "Parameter is invalid", "parameter_name": "planned_encoding_and_feature_engineering_id",
+ "value": planned_encoding_and_feature_engineering_id})
+
+ feature_selector_code = request_parameter.get("feature_selector_code")
+ if feature_selector_code is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body", "parameter_name": "feature selector code"})
+ if feature_selector_code not in feature_selector_keys:
+ raise IllegalArgumentException(
+ {"message": "Parameter is invalid", "parameter_name": "feature_selector_code",
+ "value": feature_selector_code})
+
+ sampler_code = request_parameter.get("sampler_code")
+ if sampler_code is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body", "parameter_name": "sampler_code"})
+ if sampler_code not in samplers_keys:
+ raise IllegalArgumentException(
+ {"message": "Parameter is invalid", "parameter_name": "sampler_code",
+ "value": sampler_code})
+
+ scaler_code = request_parameter.get("scaler_code")
+ if scaler_code is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body", "parameter_name": "scaler_code"})
+ if scaler_code not in scalers_keys:
+ raise IllegalArgumentException(
+ {"message": "Parameter is invalid", "parameter_name": "scaler_code",
+ "value": scaler_code})
+
+ model_code = request_parameter.get("model_code")
+ if model_code is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body", "parameter_name": "model_code"})
+ if model_code not in models_keys:
+ raise IllegalArgumentException(
+ {"message": "Parameter is invalid", "parameter_name": "model_code",
+ "value": model_code})
+
+ test_size = request_parameter.get("test_size")
+ if test_size is None:
+ raise IllegalArgumentException(
+ {"message": "Parameter is missing in request body", "parameter_name": "test_size"})
+ if test_size >= 1 or test_size <= 0:
+ raise IllegalArgumentException(
+ {"message": "Parameter must be between 0 and 1", "parameter_name": "test_size", "value": test_size})
+
+ progress_status = STORED
+
+ self.check_if_train_task_exist(name, planned_encoding_and_feature_engineering_id, feature_selector_code,
+ sampler_code, scaler_code,
+ model_code, test_size)
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_query = "INSERT INTO common_fraud.train_task (unique_field_name, train_task_name, planned_encoding_and_feature_engineering_id, feature_selector_code, sampler_code, scaler_code, model_code, test_size, progress_status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+ parameter = (
+ unique_field_name, name, planned_encoding_and_feature_engineering_id, feature_selector_code,
+ sampler_code, scaler_code, model_code, test_size, progress_status)
+ try:
+ cursor.execute(sql_insert_query, parameter)
+ connection_object.commit()
+ last_inserted_row_id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(
+ f"Train task persisted, train_task_name: {name},planned_encoding_id_and_feature_engineering_id: {planned_encoding_and_feature_engineering_id}, feature_selector_code: {feature_selector_code}, sampler_code: {sampler_code}, scaler_code: {scaler_code}, model_code: {model_code}, test_size: {test_size}")
+ return last_inserted_row_id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def check_if_train_task_exist(self, name, planned_encoding_and_feature_engineering_id, feature_selector_code,
+ sampler_code, scaler_code, model_code, test_size):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.train_task WHERE train_task_name = %s and planned_encoding_and_feature_engineering_id = %s and feature_selector_code = %s and sampler_code = %s and scaler_code = %s and model_code = %s and test_size = %s"
+ parameter = (
+ name, planned_encoding_and_feature_engineering_id, feature_selector_code, sampler_code, scaler_code,
+ model_code, test_size)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ if query_result is not None:
+ id = query_result[0]
+ raise TrainTaskExistYetException({"message": "Train task exists yet", "id": id})
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_all_train_task(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.train_task"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ result = self.build_train_task_response(query_result)
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_train_task_by_id(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.train_task WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ result = self.build_train_task_response(query_result)
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def build_train_task_response(self, query_result):
+ result = dict()
+ for item in query_result:
+ id = item[0]
+ unique_field_name = item[1]
+ name = item[2]
+ planned_encoding_and_feature_engineering_id = item[3]
+ feature_selector_code = item[4]
+ sampler_code = item[5]
+ scaler_code = item[6]
+ model_code = item[7]
+ test_size = item[8]
+ process_status = item[9]
+ result[id] = {
+ "unique_field_name": unique_field_name,
+ "train_task_name": name,
+ "planned_encoding_and_feature_engineering_id": planned_encoding_and_feature_engineering_id,
+ "feature_selector_code": feature_selector_code,
+ "sampler_code": sampler_code,
+ "scaler_code": scaler_code,
+ "model_code": model_code,
+ "test_size": test_size,
+ "process_status": process_status}
+ return result
+
+ def set_train_task_status(self, id, progress_status):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_update_command = "UPDATE common_fraud.train_task SET progress_status = %s WHERE id = %s"
+ parameter = (progress_status, id)
+ try:
+ cursor.execute(sql_update_command, parameter)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_train_task_status(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT progress_status FROM common_fraud.train_task WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ if query_result is not None:
+ status = query_result[0]
+ else:
+ status = DOESNT_EXIST
+ return status
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def create_train_task_csv_file(self, path, file_name, date_time_to_append, feature_selectors, samplers, scalers,
+ models):
+ header_elements, lines = self.create_train_task_csv_content(feature_selectors, samplers, scalers, models)
+ self.write_csv_file(path, file_name, date_time_to_append, header_elements, lines)
+
+ def create_train_task_csv_content(self, feature_selectors, samplers, scalers, models):
+ header_elements = (
+ "id", "transaction_identifier_field_name", "train_task_name", "planned_encoding_and_feature_engineering",
+ "feature_selector", "sampler", "scaler", "model", "test_size", "process_status")
+ lines = list()
+ train_task_properties_by_id = self.get_all_train_task()
+ for key, train_task_properties in train_task_properties_by_id.items():
+ line = list()
+ id = key
+ transaction_identifier_field_name = train_task_properties.get("transaction_identifier_field_name")
+ train_task_name = train_task_properties.get("train_task_name")
+ planned_encoding_and_feature_engineering_id = train_task_properties.get(
+ "planned_encoding_and_feature_engineering_id")
+ feature_selector_code = train_task_properties.get("feature_selector_code")
+ sampler_code = train_task_properties.get("sampler_code")
+ scaler_code = train_task_properties.get("scaler_code")
+ model_code = train_task_properties.get("model_code")
+ test_size = train_task_properties.get("test_size")
+ progress_status = train_task_properties.get("process_status")
+ planned_encoding_and_feature_engineering_dict = self.get_encoding_and_feature_engineering_plan_by_id(
+ planned_encoding_and_feature_engineering_id)
+ planned_encoding_and_feature_engineering_string = self.convert_encoding_and_feature_enginnering_dict_to_string(
+ planned_encoding_and_feature_engineering_dict[0])
+ line.append(id)
+ line.append(transaction_identifier_field_name)
+ line.append(train_task_name)
+ line.append(planned_encoding_and_feature_engineering_string)
+ line.append(feature_selectors.get(feature_selector_code)[0])
+ line.append(samplers.get(sampler_code)[0])
+ line.append(scalers.get(scaler_code)[0])
+ line.append(models.get(model_code)[0])
+ line.append(test_size)
+ line.append(progress_status)
+ lines.append(line)
+ return header_elements, lines
+
+ def create_metrics_csv_file(self, path, file_name, date_time_to_append):
+ header_elements, lines = self.create_metrics_csv_content()
+ self.write_csv_file(path, file_name, date_time_to_append, header_elements, lines)
+
+ def create_metrics_csv_content(self):
+ header_elements = (
+ "id", "train_task_name", "TP", "FP", "TN", "FN", "accuracy", "balanced_accuracy", "precision", "recall",
+ "sensitivity", "specificity", "PPV", "NPV", "FNR", "FPR", "FDR", "FOR", "f1", "f0.5", "f2", "MCC", "ROCAUC",
+ "Youdens_statistic")
+ lines = list()
+ list_of_metrics_dict = self.get_all_metrics()
+ for single_metrics_dict in list_of_metrics_dict:
+ line = list()
+ id = single_metrics_dict.get("id")
+ estimator_id = single_metrics_dict.get("estimator_id")
+ TP = single_metrics_dict.get("TP")
+ FP = single_metrics_dict.get("FP")
+ TN = single_metrics_dict.get("TN")
+ FN = single_metrics_dict.get("FN")
+ accuracy = single_metrics_dict.get("accuracy")
+ balanced_accuracy = single_metrics_dict.get("balanced_accuracy")
+ precision = single_metrics_dict.get("precision")
+ recall = single_metrics_dict.get("recall")
+ sensitivity = single_metrics_dict.get("sensitivity")
+ specificity = single_metrics_dict.get("specificity")
+ PPV = single_metrics_dict.get("PPV")
+ NPV = single_metrics_dict.get("NPV")
+ FNR = single_metrics_dict.get("FNR")
+ FPR = single_metrics_dict.get("FPR")
+ FDR = single_metrics_dict.get("FDR")
+ FOR = single_metrics_dict.get("FOR")
+ f1 = single_metrics_dict.get("f1")
+ f0_5 = single_metrics_dict.get("f0.5")
+ f2 = single_metrics_dict.get("f2")
+ MCC = single_metrics_dict.get("MCC")
+ ROCAUC = single_metrics_dict.get("ROCAUC")
+ Youdens_statistic = single_metrics_dict.get("Youdens_statistic")
+ train_task_name = self.get_train_task_name_by_estimator_id(estimator_id)
+ line.append(id)
+ line.append(train_task_name)
+ line.append(TP)
+ line.append(FP)
+ line.append(TN)
+ line.append(FN)
+ line.append(accuracy)
+ line.append(balanced_accuracy)
+ line.append(precision)
+ line.append(recall)
+ line.append(sensitivity)
+ line.append(specificity)
+ line.append(PPV)
+ line.append(NPV)
+ line.append(FNR)
+ line.append(FPR)
+ line.append(FDR)
+ line.append(FOR)
+ line.append(f1)
+ line.append(f0_5)
+ line.append(f2)
+ line.append(MCC)
+ line.append(ROCAUC)
+ line.append(Youdens_statistic)
+ lines.append(line)
+ return header_elements, lines
+
+ def convert_encoding_and_feature_enginnering_dict_to_string(self, input_dict):
+ keys = (
+ "schema_name", "table_name", "time_base_field_name", "encoding_parameters",
+ "feature_engineering_parameters")
+ line = str()
+ for key in keys:
+ line += key + " = "
+ value = input_dict.get(key)
+ if isinstance(value, list) or isinstance(value, dict):
+ flatted_value = str(value)
+ converted_flatted_value = flatted_value.replace(",", ";")
+ line += converted_flatted_value
+ else:
+ line += value if value is not None else "null"
+ if keys.index(key) != len(keys) - 1:
+ line += "; "
+ return line
+
+ def write_csv_file(self, path, file_name, date_time_to_append, header_elements, lines):
+ full_file_name = path + "/" + file_name + ".csv"
+ if date_time_to_append == True:
+ datetime_now = datetime.datetime.now()
+ year_month_day = datetime_now.strftime("%Y%m%d")
+ hour_min_sec = datetime_now.strftime("%H%M%S")
+ full_file_name = full_file_name + "_" + str(year_month_day) + "_" + str(hour_min_sec)
+ with open(full_file_name, 'w') as csv_file:
+ csv_writer = csv.writer(csv_file)
+ csv_writer.writerow(header_elements)
+ csv_writer.writerows(lines)
+
+ def get_train_task_name_by_estimator_id(self, estimator_id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT train_task_name FROM common_fraud.train_task t join common_fraud.estimator e on t.id = e.train_task_id WHERE e.id = %s"
+ parameter = (estimator_id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ return query_result[0] if query_result else None
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def check_if_any_train_in_progress(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.train_task WHERE progress_status = %s"
+ parameter = (IN_PROGRESS,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ if query_result:
+ result = True
+ else:
+ result = False
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_existing_planned_encoding_and_feature_engineering_id(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ result = list()
+ sql_select_query = "select id from common_fraud.planned_encoding_and_feature_engineering"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ for item in query_result:
+ result.append(item[0])
+ return result
+
+ def get_parameters_from_plan_by_id(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.planned_encoding_and_feature_engineering WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ schema_name = query_result[1]
+ table_name = query_result[2]
+ detailed_information_about_table = json.loads(query_result[3])
+ time_base_field_name = query_result[4]
+ encoding_parameters = json.loads(query_result[5])
+ feature_engineering_parameters = json.loads(query_result[6])
+ return schema_name, table_name, detailed_information_about_table, time_base_field_name, encoding_parameters, feature_engineering_parameters
+
+ def get_raw_dataset_id(self, schema_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.raw_dataset WHERE schema_name = %s and table_name = %s"
+ parameter = (schema_name, table_name)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ if query_result is not None:
+ id = query_result[0]
+ else:
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.raw_dataset (schema_name, table_name) VALUES(%s, %s)"
+ parameter = (schema_name, table_name)
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ return id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_encoding_id_by_json_content(self, encoding_parameters):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.encoding"
+ try:
+ id = None
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ for item in query_result:
+ if json.loads(item[1]) == encoding_parameters:
+ id = item[0]
+ if id is None:
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.encoding (encoding_parameters) VALUES(%s)"
+ parameter = (json.dumps(encoding_parameters),)
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ return id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_encoded_table_properties(self, raw_dataset_id, encoding_id, time_base_field_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.encoded_table_registry where raw_dataset_id = %s and encoding_id = %s and time_base_field_name = %s"
+ try:
+ parameter = (raw_dataset_id, encoding_id, time_base_field_name)
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ id = None
+ encoded_table_name = None
+ encoded_field_names = None
+ label_encoder_registry = None
+ if query_result is not None:
+ id = query_result[0]
+ encoded_table_name = query_result[4]
+ encoded_field_names = json.loads(query_result[5])
+ label_encoder_registry = json.loads(query_result[6])
+ return id, encoded_table_name, encoded_field_names, label_encoder_registry
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_feature_engineering_id_by_json_content(self, feature_engineering_parameters):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.feature_engineering"
+ try:
+ id = None
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ for item in query_result:
+ if json.loads(item[1]) == feature_engineering_parameters:
+ id = item[0]
+ if id is None:
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.feature_engineering (feature_engineering_parameters) VALUES(%s)"
+ parameter = (json.dumps(feature_engineering_parameters),)
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ return id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def encode(self, raw_dataset_id, schema_name, table_name, encoding_id, encoding_parameters, time_base_field_name):
+
+ detailed_information_about_table = self.get_detailed_information_about_table(schema_name, table_name)
+ field_types_by_name = self.get_field_types_by_name(schema_name, table_name)
+ original_field_names = list(field_types_by_name.keys())
+ original_data_set = self.get_record_for_processing(schema_name, table_name, time_base_field_name)
+ encoder = encoding_module.DataBaseEncoder(original_data_set, original_field_names)
+ encoded_array, encoder_by_field_name, encoded_field_names = encoder.encode(encoding_parameters,
+ detailed_information_about_table)
+ encoded_table_name = table_name + UNDER_SCORE + ENCODED + UNDER_SCORE + str(encoding_id)
+ ddl_command_builder = ddl_build_module.DdlCommandBuilder()
+ encoded_table_create_script = ddl_command_builder.build_create_encoded_or_engineered_table_script(schema_name,
+ encoded_table_name,
+ encoded_field_names)
+ self.create_encoded_or_engineered_table(encoded_table_create_script)
+ insert_script = ddl_command_builder.create_insert_into_encoded_or_feature_engineered_script(schema_name,
+ encoded_table_name,
+ encoded_field_names)
+ self.persist_encoded_or_engineered_dataset(insert_script, encoded_array, type="Encoded")
+ label_encoder_registry = dict()
+ for field_name, label_encoder in encoder_by_field_name.items():
+ id_in_label_encoder_table = self.persist_label_encoder(field_name, label_encoder)
+ label_encoder_registry[field_name] = id_in_label_encoder_table
+ encoded_table_registry_id = self.persist_encoded_table_registry(raw_dataset_id, encoding_id,
+ time_base_field_name, encoded_table_name,
+ encoded_field_names, label_encoder_registry)
+ return encoded_array, encoded_table_registry_id, encoded_table_name, encoded_field_names, label_encoder_registry
+
+ def get_field_types_by_name(self, schema_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' ORDER BY ordinal_position"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ field_types_by_name = dict()
+ for column_name_and_type in query_result:
+ field_name = column_name_and_type[0]
+ field_type = column_name_and_type[1].decode("utf-8")
+ field_types_by_name[field_name] = field_type
+ self.logger.debug(f"field types by name in {schema_name} schema, {table_name} table: {field_types_by_name}")
+ return field_types_by_name
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_field_name_in_ordinal_position(self, schema_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' ORDER BY ordinal_position"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ field_names = list()
+ for item in query_result:
+ field_names.append(item[0])
+ self.logger.debug(f"field names in {schema_name} schema, {table_name} table: {field_names}")
+ return field_names
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_field_type_in_ordinal_position(self, schema_name, table_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}' ORDER BY ordinal_position"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ field_names = list()
+ for item in query_result:
+ field_names.append(item[0].decode("utf-8"))
+ self.logger.debug(f"field types in {schema_name} schema, {table_name} table: {field_names}")
+ return field_names
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_record_for_processing(self, schema_name, table_name, used_time_base_field):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT * FROM {schema_name}.{table_name} ORDER BY {used_time_base_field} DESC"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ result = np.array(query_result)
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def persist_encoded_or_engineered_dataset(self, script, dataset, type):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ length_of_dataset = len(dataset)
+ bound = 100
+ try:
+ if length_of_dataset > bound:
+ number_of_part_array = int(length_of_dataset / bound)
+ number_of_rest_datas = length_of_dataset - number_of_part_array * bound
+ for i in range(0, number_of_part_array, 1):
+ temp_array = dataset[i * bound:(i + 1) * bound, :]
+ value_list = list()
+ for record in temp_array:
+ value_list.append(tuple(record))
+ cursor.executemany(script, value_list)
+ connection_object.commit()
+ temp_array = dataset[
+ (number_of_part_array) * bound:(number_of_part_array) * bound + number_of_rest_datas, :]
+ value_list = list()
+ for record in temp_array:
+ value_list.append(tuple(record))
+ cursor.executemany(script, value_list)
+ connection_object.commit()
+ else:
+ value_list = list()
+ for record in dataset:
+ value_list.append(tuple(record))
+ cursor.executemany(script, value_list)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"{type} database persisted")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_table_name_from_feature_engineering_registry(self, encoded_table_registry_id, feature_engineering_id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.feature_engineered_table_registry WHERE encoded_table_registry_id = %s and feature_engineering_id = %s"
+ try:
+ parameter = (encoded_table_registry_id, feature_engineering_id)
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ feature_engineered_table_name = None
+ feature_engineered_insert_script = None
+ if query_result is not None:
+ feature_engineered_table_name = query_result[3]
+ feature_engineered_insert_script = query_result[4]
+ return feature_engineered_table_name, feature_engineered_insert_script
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def create_encoded_or_engineered_table(self, script):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ try:
+ cursor.execute(script)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_records_for_processing(self, schema_name, table_name, time_stamp_field_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT * FROM {schema_name}.{table_name} ORDER BY {time_stamp_field_name} DESC"
+ try:
+ cursor.execute(sql_select_query)
+ result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ records_with_id = np.array(result)
+ records_without_id = records_with_id[:, 1:]
+ return records_without_id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def persist_label_encoder(self, field_name, encoder):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.label_encoder (encoder_object) values (%s)"
+ pickled_encoder = pickle.dumps(encoder)
+ parameter = (pickled_encoder,)
+ try:
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ last_inserted_row_id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"label encoder persisted, field name: {field_name}, id: {id}")
+ return last_inserted_row_id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def persist_encoded_table_registry(self, raw_dataset_id, encoding_id, time_base_field_name, encoded_table_name,
+ encoded_field_names,
+ label_encoder_registry):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.encoded_table_registry (raw_dataset_id, encoding_id, time_base_field_name, encoded_table_name, encoded_field_names, label_encoder_registry) values (%s,%s,%s,%s,%s,%s)"
+ parameter = (
+ raw_dataset_id, encoding_id, time_base_field_name, encoded_table_name, json.dumps(encoded_field_names),
+ json.dumps(label_encoder_registry))
+ try:
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ last_inserted_row_id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(
+ f"record persisted in the encoded_table_registry table, encoded_table_name: {encoded_table_name}")
+ return last_inserted_row_id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def feature_engineer(self, schema_name, encoded_table_name, fraud_type_field_name, feature_engineering_id, dataset,
+ time_base_field_name,
+ encoded_field_names, feature_engineering_parameters, encoded_table_registry_id, used_cpu_core):
+ engineer = engineering_module.Engineer(time_base_field_name, encoded_field_names, fraud_type_field_name,
+ feature_engineering_parameters,
+ used_cpu_core)
+ feature_engineered_dataset, feature_engineered_field_names = engineer.create_new_features(dataset)
+ feature_engineered_table_name = encoded_table_name + UNDER_SCORE + FE + UNDER_SCORE + str(
+ feature_engineering_id)
+ ddl_command_builder = ddl_build_module.DdlCommandBuilder()
+ feature_engineered_table_create_script = ddl_command_builder.build_create_encoded_or_engineered_table_script(
+ schema_name, feature_engineered_table_name, feature_engineered_field_names)
+ self.create_encoded_or_engineered_table(feature_engineered_table_create_script)
+ feature_engineered_insert_script = ddl_command_builder.create_insert_into_encoded_or_feature_engineered_script(
+ schema_name, feature_engineered_table_name, feature_engineered_field_names)
+ feature_engineered_insert_script_using_in_prediction = ddl_command_builder.create_insert_into_encoded_or_feature_engineered_script_for_prediction(
+ schema_name, feature_engineered_table_name, feature_engineered_field_names, fraud_type_field_name)
+ self.persist_encoded_or_engineered_dataset(feature_engineered_insert_script, feature_engineered_dataset,
+ type="Feature engineered")
+ self.persist_feature_engineered_table_registry_item(encoded_table_registry_id, feature_engineering_id,
+ feature_engineered_table_name,
+ feature_engineered_insert_script)
+ return feature_engineered_dataset, feature_engineered_table_name, feature_engineered_insert_script_using_in_prediction
+
+ def persist_feature_engineered_table_registry_item(self, encoded_table_registry_id, feature_engineering_id,
+ feature_engineered_table_name, feature_engineered_insert_script):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_command = "INSERT INTO common_fraud.feature_engineered_table_registry (encoded_table_registry_id, feature_engineering_id, feature_engineered_table_name, feature_engineered_insert_script) values (%s,%s,%s,%s)"
+ parameter = (encoded_table_registry_id, feature_engineering_id, feature_engineered_table_name,
+ feature_engineered_insert_script)
+ try:
+ cursor.execute(sql_insert_command, parameter)
+ connection_object.commit()
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(
+ f"record persisted in feature_engineered_table_registry table, feature_engineered_table_name: {feature_engineered_table_name}")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ #
+ def get_train_parameters(self, train_task_id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "select * from common_fraud.train_task where id = %s"
+ parameter = (train_task_id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def persist_estimator(self, train_task_id, transaction_identifier_field_name, estimator):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_query = "INSERT INTO common_fraud.estimator (train_task_id, transaction_identifier_field_name, estimator_object) VALUES (%s,%s,%s)"
+ pickled_estimator_container = pickle.dumps(estimator)
+ parameter = (train_task_id, transaction_identifier_field_name, pickled_estimator_container)
+ try:
+ cursor.execute(sql_insert_query, parameter)
+ connection_object.commit()
+ last_inserted_row_id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"estimator persisted, train_task_id: {train_task_id}, id: {last_inserted_row_id}")
+ return last_inserted_row_id
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def persist_metrics(self, estimator_id, TP, FP, TN, FN, sensitivity, specificity, accuracy, balanced_accuracy,
+ precision, recall, PPV, NPV, FNR, FPR, FDR, FOR, f1, f_05, f2, MCC, ROCAUC, Youdens_statistic):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_insert_query = "INSERt INTO common_fraud.metrics (estimator_id,TP,FP,TN,FN,sensitivity,specificity,accuracy,balanced_accuracy,prec,recall,PPV,NPV,FNR,FPR,FDR,F_OR,f1,f_05,f2,MCC,ROCAUC,Youdens_statistic) VALUES" \
+ "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+ values = (estimator_id, TP, FP, TN, FN, sensitivity, specificity, accuracy, balanced_accuracy,
+ precision, recall, PPV, NPV, FNR, FPR, FDR, FOR, f1, f_05, f2, MCC, ROCAUC, Youdens_statistic)
+ try:
+ cursor.execute(sql_insert_query, values)
+ connection_object.commit()
+ last_inserted_row_id = cursor.lastrowid
+ cursor.close()
+ connection_object.close()
+ self.logger.debug(f"metrics persisted, estimator_id: {estimator_id}, id: {last_inserted_row_id}")
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_all_metrics(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.metrics"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ single_response = self.build_complex_metrics_response(query_result)
+ return single_response
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ # 6
+ def get_metrics_by_id(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.metrics WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ complex_response = self.build_complex_metrics_response(query_result)
+ return complex_response
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def build_complex_metrics_response(self, query_result):
+ response = list()
+ for record in query_result:
+ response.append(self.build_simple_metrics_response(record))
+ return response
+
+ def build_simple_metrics_response(self, record):
+ single_result = dict()
+ single_result["id"] = record[0]
+ single_result["estimator_id"] = record[1]
+ single_result["TP"] = record[2]
+ single_result["FP"] = record[3]
+ single_result["TN"] = record[4]
+ single_result["FN"] = record[5]
+ single_result["sensitivity"] = record[6]
+ single_result["specificity"] = record[7]
+ single_result["accuracy"] = record[8]
+ single_result["balanced_accuracy"] = record[9]
+ single_result["precision"] = record[10]
+ single_result["recall"] = record[11]
+ single_result["PPV"] = record[12]
+ single_result["NPV"] = record[13]
+ single_result["FNR"] = record[14]
+ single_result["FPR"] = record[15]
+ single_result["FDR"] = record[16]
+ single_result["FOR"] = record[17]
+ single_result["f1"] = record[18]
+ single_result["f0.5"] = record[19]
+ single_result["f2"] = record[20]
+ single_result["MCC"] = record[21]
+ single_result["ROCAUC"] = record[22]
+ single_result["Youdens_statistic"] = record[23]
+ return single_result
+
+ def get_all_estimator(self):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.estimator"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ response = list()
+ if query_result:
+ response = self.build_estimator_response(query_result)
+ return response
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+ single_response = self.build_complex_metrics_response(query_result)
+ return single_response
+
+ def get_estimator_by_train_task_id(self, train_task_id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT * FROM common_fraud.estimator WHERE train_task_id = %s"
+ parameter = (train_task_id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ response = list()
+ if query_result:
+ response = self.build_estimator_response(query_result)
+ return response
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def build_estimator_response(self, query_result):
+ summarized_result = list()
+ for item in query_result:
+ single_result = dict()
+ single_result["id"] = item[0]
+ single_result["train_task_id"] = item[1]
+ single_result["transaction_identifier_field_name"] = item[2]
+ summarized_result.append(single_result)
+ return summarized_result
+
+ def get_transaction_identifier_field_name_by_estimator_id(self, id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT transaction_identifier_field_name FROM common_fraud.estimator WHERE id = %s"
+ parameter = (id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ return query_result[0] if query_result else None
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_confusion_matrix_elements_by_train_task_id(self, train_task_id):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = "SELECT TP, TN, FP, FN FROM common_fraud.metrics inner join common_fraud.estimator e on metrics.estimator_id = e.id inner join common_fraud.train_task tt on e.train_task_id = tt.id where train_task_id = %s"
+ parameter = (train_task_id,)
+ try:
+ cursor.execute(sql_select_query, parameter)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ response = dict()
+ if query_result:
+ response["TP"] = query_result[0]
+ response["TN"] = query_result[1]
+ response["FP"] = query_result[2]
+ response["FN"] = query_result[3]
+ return response
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_columns_contain_only_distinct_values(self, schema_name, table_name):
+ result = list()
+ column_names = self.get_field_name_in_ordinal_position(schema_name, table_name)
+ count_by_column_name = dict()
+ for column_name in column_names:
+ count = self.get_distinct_count_of_column(schema_name, table_name, column_name)
+ count_by_column_name[column_name] = count
+ detailed_info = self.get_detailed_information_about_table(schema_name, table_name)
+ record_number = detailed_info.get("record_number")
+ for column_name, count in count_by_column_name.items():
+ if count == record_number:
+ result.append(column_name)
+ return result
+
+ def get_distinct_count_of_column(self, schema_name, table_name, column_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT COUNT(DISTINCT {column_name}) FROM {schema_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ count = query_result[0]
+ self.logger.debug(
+ f"Distinct count of values: {count} in {schema_name} schema, {table_name} table: {column_name} column.")
+ return count
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def create_reduced_and_for_evaluation_tables(self, schema_name, table_name, fraud_type_field_name,
+ positive_fraud_case_number, negative_fraud_case_number,
+ date_suffix_to_append):
+ detailed_information_about_table = self.get_detailed_information_about_table(schema_name, table_name)
+ fraud_candidates = detailed_information_about_table.get("fraud_candidates")
+ fraud_candidate_names = list()
+ for fraud_candidate in fraud_candidates:
+ fraud_candidate_name = fraud_candidate.get("name")
+ fraud_candidate_names.append(fraud_candidate_name)
+ if fraud_type_field_name not in fraud_candidate_names:
+ raise IllegalArgumentException({"message": "Fraud type field name not in fraud candidates",
+ "parameter_name": fraud_type_field_name})
+ positive_fraud_case_ids = self.get_id_collection_by_column_name_and_value(schema_name, table_name,
+ fraud_type_field_name, 1)
+ negative_fraud_case_ids = self.get_id_collection_by_column_name_and_value(schema_name, table_name,
+ fraud_type_field_name, 0)
+ random_selected_positive_fraud_case_ids = random.sample(positive_fraud_case_ids, k=positive_fraud_case_number)
+ random_selected_negative_fraud_case_ids = random.sample(negative_fraud_case_ids, k=negative_fraud_case_number)
+ records_by_id = self.get_records(schema_name, table_name)
+ field_names_in_ordinal_position = self.get_field_name_in_ordinal_position(schema_name, table_name)
+ fraud_field_index = field_names_in_ordinal_position.index(fraud_type_field_name) - 1
+ reduced_records = list()
+ for_evaluation_records = list()
+ simulated_real_recods = list()
+ for id, record in records_by_id.items():
+ if id not in random_selected_positive_fraud_case_ids and id not in random_selected_negative_fraud_case_ids:
+ reduced_records.append(record)
+ for id, record in records_by_id.items():
+ if id in random_selected_negative_fraud_case_ids:
+ fraud_field_value = record.pop(fraud_field_index)
+ for_evaluation_record = [id]
+ for_evaluation_record.extend(record)
+ for_evaluation_records.append(for_evaluation_record)
+ simulated_real_recods.append([id, fraud_field_value])
+ for id, record in records_by_id.items():
+ if id in random_selected_positive_fraud_case_ids:
+ fraud_field_value = record.pop(fraud_field_index)
+ for_evaluation_record = [id]
+ for_evaluation_record.extend(record)
+ for_evaluation_records.append(for_evaluation_record)
+ simulated_real_recods.append([id, fraud_field_value])
+ field_name_and_type_collection = detailed_information_about_table.get("fields")
+ primary_key_field_name = detailed_information_about_table.get("primary_key")
+ field_type_by_name_for_evaluation = dict()
+ field_type_by_name_reduced = dict()
+ field_type_by_name_simulated_real = dict()
+ for item in field_name_and_type_collection:
+ field_name = item.get("name")
+ field_type = item.get("type")
+ if field_name == primary_key_field_name:
+ field_type_by_name_simulated_real["transaction_identifier"] = field_type
+ else:
+ if field_type == "varchar":
+ field_type += "(255)"
+ if field_name == fraud_type_field_name:
+ field_type_by_name_reduced[field_name] = field_type
+ field_type_by_name_simulated_real[field_name] = field_type
+ else:
+ field_type_by_name_reduced[field_name] = field_type
+ field_type_by_name_for_evaluation[field_name] = field_type
+ reduced_table_name = table_name + "_reduced"
+ for_evaluation_table_name = table_name + "_for_evaluation"
+ simulated_real_table_name = table_name + "_simulated_real"
+ if date_suffix_to_append:
+ now = datetime.datetime.now()
+ date_and_time = now.strftime("%Y%m%d_%H%M%S")
+ date_time_suffix = UNDER_SCORE + str(date_and_time)
+ reduced_table_name += date_time_suffix
+ for_evaluation_table_name += date_time_suffix
+ simulated_real_table_name += date_time_suffix
+ self.create_train_table(schema_name, reduced_table_name, field_type_by_name_reduced,
+ is_id_auto_incremented=True)
+ self.create_train_table(schema_name, for_evaluation_table_name, field_type_by_name_for_evaluation,
+ is_id_auto_incremented=False)
+ self.create_train_table(schema_name, simulated_real_table_name, field_type_by_name_simulated_real,
+ is_id_auto_incremented=True)
+
+ reduced_records_array = np.array(reduced_records)
+ for_evaluated_records_array = np.array(for_evaluation_records)
+ simulated_real_records_array = np.array(simulated_real_recods,dtype='O')
+ ddl_command_builder = ddl_build_module.DdlCommandBuilder()
+ reduced_records_insert_script = ddl_command_builder.create_insert_into_generic_train_database_script(
+ schema_name, reduced_table_name, field_type_by_name_reduced, is_id_auto_incremented=True)
+ for_evaluated_records_insert_script = ddl_command_builder.create_insert_into_generic_train_database_script(
+ schema_name, for_evaluation_table_name, field_type_by_name_for_evaluation, is_id_auto_incremented=False)
+ simulated_real_recods_insert_script = ddl_command_builder.create_insert_into_generic_train_database_script(
+ schema_name, simulated_real_table_name, field_type_by_name_simulated_real, is_id_auto_incremented=True)
+
+ self.persist_encoded_or_engineered_dataset(reduced_records_insert_script, reduced_records_array, REDUCED)
+ self.persist_encoded_or_engineered_dataset(for_evaluated_records_insert_script, for_evaluated_records_array,
+ FOR_EVAULATED)
+ self.persist_encoded_or_engineered_dataset(simulated_real_recods_insert_script, simulated_real_records_array,
+ SIMULATED_REAL)
+ planned_csv_content = dict()
+ for record in simulated_real_recods:
+ planned_csv_content[record[0]] = record[1]
+ csv_content_and_file_name = dict()
+ csv_content_and_file_name["planned_csv_content"] = planned_csv_content
+ csv_content_and_file_name["planned_csv_file_name"] = for_evaluation_table_name
+ return csv_content_and_file_name
+
+ def get_id_collection_by_column_name_and_value(self, schema_name, table_name, column_name, value):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT id FROM {schema_name}.{table_name} WHERE {column_name} = {value}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchall()
+ cursor.close()
+ connection_object.close()
+ result = list()
+ for item in query_result:
+ result.append(item[0])
+ return result
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
+
+ def get_generic_records_for_evaluation(self, schema_name, table_name):
+ field_names = self.get_field_name_in_ordinal_position(schema_name, table_name)
+ field_types = self.get_field_type_in_ordinal_position(schema_name, table_name)
+ detailed_information = self.get_detailed_information_about_table(schema_name, table_name)
+ primary_key_field_name = detailed_information.get("primary_key")
+ record_by_id = self.get_records(schema_name, table_name)
+ records_in_string_format = list()
+ for id, record in record_by_id.items():
+ fields_as_str = list()
+ fields_as_str.append(id)
+ for field in record:
+ fields_as_str.append(str(field))
+ records_in_string_format.append(fields_as_str)
+ result = dict()
+ result["field_types"] = field_types
+ result["field_names"] = field_names
+ result["records"] = records_in_string_format
+ return result
+
+ def get_max_retrospective_day(self,schema_name,table_name,time_base_field_name):
+ connection_object = self.connection_pool.get_connection()
+ cursor = connection_object.cursor()
+ sql_select_query = f"SELECT DATEDIFF(max({time_base_field_name}),min({time_base_field_name}))id FROM {schema_name}.{table_name}"
+ try:
+ cursor.execute(sql_select_query)
+ query_result = cursor.fetchone()
+ cursor.close()
+ connection_object.close()
+ return query_result[0]
+ except mysql.connector.Error as err:
+ self.logger.error(f"MySQL error message: {err.msg}")
\ No newline at end of file