Data Migration Validation
In nowadays, a lot of companies and cooperation are preparing and migrating On-premises data on Legacy system to new system with high scalability, reliability and resiliency as core tenets of Data Platform. There are some high-lighted items to be addressed for validating migrated data.
Problem
- Data need to be rekey for new logic.
- Capture full data with new schema mapping.
- Define Slowly Changing if the Legacy system still inputs new data to migrated table.
Test Cases
- Row count validation to capture if there is full data migrated.
- Duplicated records count to verify if the same record is captured.
- Check full records changing during migration if any data changed.
Environment set-up
- Spark Nodes: AWS EMR
- Workspace: Jupiter Notebook
-
PySpark Kernel Setup with
sc
to check the current status%%info
to verify the current configuration
Implementation
1. Import and Config Logging
# Lib and Var
import json
import logging
from pyspark.sql.functions import countDistinct, concat_ws, sha2
DESTINATION_LOCATION = 's3://stage_location/stream_output/'
TARGET_LOCATION = 's3://public_location/data/parquet/'
2. Set global variable
S3_LOCATION = {
'event-d': {
'stage_s3': STAGE_S3_LOCATION + 'event_d/',
'public_s3': PUB_S3_LOCATION + 'event_d/',
},
'eventtype-d': {
'stage_s3': STAGE_S3_LOCATION + 'eventtype_d/',
'public_s3': PUB_S3_LOCATION + 'eventtype_d/',
},
'page-d': {
'stage_s3': STAGE_S3_LOCATION + 'page_d/',
'public_s3': PUB_S3_LOCATION + 'page_d/',
},
'site-d': {
'stage_s3': STAGE_S3_LOCATION + 'site_d/',
'public_s3': PUB_S3_LOCATION + 'site_d/',
},
'componentapplication-d': {
'stage_s3': STAGE_S3_LOCATION + 'componentapplication_d/',
'public_s3': PUB_S3_LOCATION + 'componentapplication_d/',
},
'confirmationtype-d': {
'stage_s3': STAGE_S3_LOCATION + 'confirmationtype_d/',
'public_s3': PUB_S3_LOCATION + 'confirmationtype_d/',
},
'geocode-d': {
'stage_s3': STAGE_S3_LOCATION + 'geocode_d/',
'public_s3': PUB_S3_LOCATION + 'geocode_d/',
},
'optimizely-d': {
'stage_s3': STAGE_S3_LOCATION + 'optimizely_d/',
'public_s3': PUB_S3_LOCATION + 'optimizely_d/',
}
}
TABLE_PK = {
'event-d': 'eventkey',
'eventtype-d': 'eventtypekey',
'page-d': 'pagekey',
'site-d': 'sitekey',
'componentapplication-d': 'componentapplicationkey',
'confirmationtype-d': 'confirmationtypekey',
'geocode-d': 'geocodekey',
'optimizely-d': 'optimizelyidkey'
}
- Set the current data location
- Composite key to identify unique record for particular table
3. Code Implementation
- Row count checking
def _comparing_row_count_dim_data(S3_LOCATION):
table_conf = S3_LOCATION
try:
for table in table_conf:
stage_loc = table_conf[table]['stage_s3']
public_loc = table_conf[table]['public_s3']
stage_df_count = spark.read.parquet(stage_loc).count()
public_df_count = spark.read.parquet(public_loc).count()
if stage_df_count == public_df_count:
print(f'Table {table} matching between public and stage: {stage_df_count} = {public_df_count}')
else:
print(f'Table {table} doesn\'t matching between public and stage: {stage_df_count} != {public_df_count}')
except Exception as e:
print(f'Error with {table} with {e}')
- Duplicated record checking
def _comparing_dup_dim_data(S3_LOCATION, TABLE_PK):
table_conf = S3_LOCATION
table_pk = TABLE_PK
try:
for table in table_conf:
stage_loc = table_conf[table]['stage_s3']
public_loc = table_conf[table]['public_s3']
table_key = table_pk[table] #create table PK
stage_df_count = spark.read.parquet(stage_loc).groupBy(table_key).count().count()
public_df_count = spark.read.parquet(public_loc).groupBy(table_key).count().count()
if stage_df_count == public_df_count:
print(f'Table {table} matching between public and stage with distinct PK: {stage_df_count} = {public_df_count}')
else:
print(f'Table {table} doesn\'t matching between public and stage with distinct PK: {stage_df_count} != {public_df_count}')
table_key = ''
except Exception as e:
print(f'Error with {table} with {e}')
- Field Comparison for every-single values by every-single records
# Field Comparison
def _read_dim_data(table):
table_name = table
table_conf = S3_LOCATION
try:
stage_loc = table_conf[table_name]['stage_s3']
public_loc = table_conf[table_name]['public_s3']
stage_df = spark.read.parquet(stage_loc)
public_df = spark.read.parquet(public_loc)
LOGGER.debug(f'Done: Read {table_name} table in stage and public')
return stage_df, public_df
except Exception as e:
print(f'Error with {table} with {e}')
def _field_compare(S3_LOCATION):
table_conf = S3_LOCATION
for table in table_conf:
stage_df , public_df = _read_dim_data(table)
# process dataframe
public_df_hash = public_df.withColumn('cols_hash', sha2(concat_ws('_', *stage_df.columns), 256))
stage_df_hash = stage_df.withColumn('cols_hash', sha2(concat_ws('_', *stage_df.columns), 256))
LOGGER.debug(f'Done: Add hashing column for {table} table in stage and public')
# inner join
matching_count = stage_df_hash.join(public_df_hash, stage_df_hash.cols_hash == public_df_hash.cols_hash, 'inner').count()
LOGGER.debug(f'Done: Counting inner join between stage and public')
print(f'Total count of different record: {matching_count} in table {table}')
Further Enhancements
Nowadays, the Data Quality Gate has defined to support engineering the data and control data quality to make sure only GOOD Data (qualified data) will be merged into Production.
Let’s explore DataOps to be more understanding how does it work and how it support data controlling.
There are 5 essential data quality rules:
- Uniqueness
- Non-nulless
- Accepted Value
- Relationships and referential integrity
- Freshness and recency
Conclusion
This step is very important and can be considered as gating for release to Production when migrating data from legacy system. The notebooks is linked in repo for detail reference.