AI-Powered Data Migration and Quality Framework

By Shaily Sharma

Introduction

Efficient and accurate data migration is crucial for organizations undergoing system upgrades, mergers, or moving to new data platforms. This project showcases a framework that automates data migration, detects and corrects data inconsistencies, and ensures high data integrity using machine learning techniques.

Project Architecture

The framework consists of the following components:

Folder Structure


.
├── app.py
├── generate_plots.py
├── requirements.txt
├── logs.log
├── data
│   ├── source_data.csv
│   └── target_data.csv
├── static
│   ├── images
│   │   ├── amount_distribution.png
│   │   └── anomalies_detected.png
│   └── styles.css
├── templates
│   └── documentation.html
├── etl
    ├── data_simulation.py
    ├── extract.py
    ├── transform.py
    ├── anomaly_detection.py
    ├── load.py
    ├── run_pipeline.py

ETL Pipeline

The ETL pipeline orchestrates the entire process from data simulation to data loading, ensuring that the data is clean and ready for use in downstream applications.

Data Simulation

The pipeline begins by simulating financial transaction data, including the introduction of anomalies to test the anomaly detection capabilities.


# etl/data_simulation.py
import pandas as pd
import numpy as np

def simulate_data(num_records=10000):
    """
    Simulates financial transaction data with anomalies.

    Args:
        num_records (int): Number of records to generate.

    Returns:
        pd.DataFrame: Simulated data.
    """
    np.random.seed(42)
    data = pd.DataFrame({
        'transaction_id': range(1, num_records + 1),
        'amount': np.random.normal(loc=1000, scale=200, size=num_records),
        'currency': np.random.choice(['USD', 'EUR', 'GBP'], size=num_records),
        'timestamp': pd.date_range(start='2023-01-01', periods=num_records, freq='T'),
        'account_number': np.random.randint(100000, 999999, size=num_records),
        'transaction_type': np.random.choice(['debit', 'credit'], size=num_records)
    })
    # Introduce anomalies
    anomalies = data.sample(frac=0.05, random_state=42)
    data.loc[anomalies.index, 'amount'] *= 10  # Inflate amount
    return data

if __name__ == '__main__':
    simulated_data = simulate_data()
    simulated_data.to_csv('data/source_data.csv', index=False)
        

Data Extraction

The extraction step reads data from the source CSV file.


# etl/extract.py
import pandas as pd

def extract_data(filepath):
    """
    Extracts data from the source file.

    Args:
        filepath (str): Path to the source data file.

    Returns:
        pd.DataFrame: Extracted data.
    """
    data = pd.read_csv(filepath)
    return data

if __name__ == '__main__':
    data = extract_data('data/source_data.csv')
        

Data Transformation

The transformation step performs data cleaning and normalization to prepare the data for anomaly detection.


# etl/transform.py
import pandas as pd

def transform_data(data):
    """
    Transforms the extracted data.

    Args:
        data (pd.DataFrame): Extracted data.

    Returns:
        pd.DataFrame: Transformed data.
    """
    # Handle missing values
    data = data.dropna()
    # Normalize amount
    data['amount_normalized'] = (data['amount'] - data['amount'].mean()) / data['amount'].std()
    # Encode categorical variables
    data = pd.get_dummies(data, columns=['currency', 'transaction_type'])
    return data

if __name__ == '__main__':
    data = pd.read_csv('data/source_data.csv')
    transformed_data = transform_data(data)
    transformed_data.to_csv('data/processed_data.csv', index=False)
        

Anomaly Detection and Correction

PyCaret's anomaly detection module is used to identify and correct anomalies in the data.


# etl/anomaly_detection.py
from pycaret.anomaly import setup, create_model, assign_model
import pandas as pd

def detect_and_remove_anomalies(data):
    """
    Detects and removes anomalies from the data.

    Args:
        data (pd.DataFrame): Transformed data.

    Returns:
        pd.DataFrame: Cleaned data.
    """
    s = setup(data, silent=True, session_id=123)
    model = create_model('iforest', fraction=0.05)
    results = assign_model(model)
    clean_data = results[results['Anomaly'] == 0].reset_index(drop=True)
    return clean_data

if __name__ == '__main__':
    data = pd.read_csv('data/processed_data.csv')
    clean_data = detect_and_remove_anomalies(data)
    clean_data.to_csv('data/target_data.csv', index=False)
        

Data Loading

The cleaned data is then loaded into the target CSV file.


# etl/load.py
import pandas as pd

def load_data(data, filepath):
    """
    Loads data into the target file.

    Args:
        data (pd.DataFrame): Cleaned data.
        filepath (str): Path to the target data file.
    """
    data.to_csv(filepath, index=False)

if __name__ == '__main__':
    clean_data = pd.read_csv('data/target_data.csv')
    load_data(clean_data, 'data/target_data.csv')
        

Libraries Used

The following libraries are utilized in this project:

Ensure all dependencies are installed by running:


pip install -r requirements.txt
        

Results

Data Statistics

The following table shows the data statistics after cleaning:

transaction_id account_number amount transaction_type_debit
count 9500.000000 9500.000000 9500.000000 9500.000000
mean 4978.520947 537107.900526 1011.919437 0.506842
std 2875.336410 249905.160373 387.492646 0.499979
min 1.000000 100126.000000 187.079380 0.000000
25% 2486.750000 321894.750000 864.081450 0.000000
50% 4976.500000 537962.500000 1000.369000 1.000000
75% 7457.250000 755385.000000 1135.121900 1.000000
max 10000.000000 970678.000000 12514.170000 1.000000

Graphs

Here are some graphs illustrating the efficiency of the anomaly detection:

Transaction Amount Distribution Before and After Cleaning

Amount Distribution

Anomalies Detected

Anomalies Detected

Conclusion

This project demonstrates the effective use of machine learning in automating data migration processes and enhancing data quality. By combining ETL practices with anomaly detection, it ensures that organizations can trust the data they rely on for critical decisions.

Contact Information

If you have any questions or would like to discuss this project further, please feel free to reach out:

Email: [email protected]

LinkedIn: Shaily Sharma