Long Bui Discovering new things. Data x Platform Ops

Generating Data Lineage with Database Tables DDL (DAG Gen)

In legacy systems, viewing data lineage and monitoring data movement can be a significant challenge. To address this pain point, I’ve developed a Proof of Concept (POC) aimed at resolving these issues.

With this POC, you can easily scan the database, configure dependencies, and automatically generate data lineage.

This project provides a solution for generating and visualizing data lineage based on table DDLs and ETL job metadata. The solution includes a Python script for generating data lineage information and a React application for visualizing the data lineage as a Directed Acyclic Graph (DAG).

How to create the data lineage ?

  • Method 1: Lineage by parsing, It automatically reads the logic used to process data.
  • Method 2: Lineage by data tagging, Transformation engine tags data that transforms or moves.
  • Method 3: Pattern-based lineage, It uses patterns to perform lineage instead of dealing with the code that transforms the data. It relies on metadata to create a lineage by looking for patterns.

Lineage Generation

Workflow: Read DDL → Config DAG and Dependencies → Update DDL → Generate DAG Diagram

Step 1: Read DDL

Extract DDL Information

  1. Setup a Database Connection: Establish a connection to the database where the DDL is stored. Use libraries like
# psycopg2 for PostgreSQL,
# pyodbc for SQL Server,
# cx_Oracle for Oracle,
# mysql-connector-python for MySQL.
  1. Fetch DDL Statements: Retrieve the DDL statements from the database. This can be done by querying the database system tables or using database-specific commands.
import psycopg2

def fetch_ddl(connection_string):
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()
    cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
    tables = cursor.fetchall()

    ddl_statements = {}
    for table in tables:
        cursor.execute(f"SELECT pg_get_tabledef({table[0]})")
        ddl_statements[table[0]] = cursor.fetchone()[0]

    cursor.close()
    conn.close()

    return ddl_statements

Step 2: Configure DAG and Dependencies

In order to monitor Data Runtime and Job Runtime on the same time, start

  1. Define Metadata: Create metadata to describe the tables, columns, and relationships.
  2. Define ETL Jobs: Identify and define the ETL jobs that affect the tables.
lineage:
  table1:
    step: 1
    elt_job: job1
  table2:
    step: 2
    elt_job: job2
  table3:
    step: 2
    elt_job: job3
edges:
  - from: table1
    to: table2
  - from: table2
    to: table3

Step 3: Update the DDL of Tables

Modify DDL Statements

  1. Modify Columns or Add Constraints: Update the DDL statements if necessary (e.g., adding constraints, indices).
def update_ddl(ddl_statements, updates):
    for table, update in updates.items():
        if table in ddl_statements:
            ddl_statements[table] += '\n' + update
    return ddl_statements

Step 4: Generate the Lineage with Chart

Generate Data Lineage

  1. Parse DDL Statements: Extract table relationships and dependencies from the DDL statements.
  2. Create Lineage Graph: Use libraries like graphviz or networkx to generate the lineage graph.
def export_dag_json(metadata, filename='dag.json'):
    dag_data = {"nodes": [], "edges": []}

    for table, details in metadata['lineage'].items():
        step = details.get('step', 'unknown')
        etl_job = details.get('elt_job', 'unknown')
        dag_data["nodes"].append({
            "id": table,
            "label": f"{table}\nStep: {step}\nJob: {etl_job}",
            "data": {
                "step": step,
                "etl_job": etl_job
            }
        })

        if 'edges' in metadata:
            for edge in metadata['edges']:
                if edge['from'] == table:
                    dag_data["edges"].append({
                        "id": f"{edge['from']}-{edge['to']}",
                        "source": edge['from'],
                        "target": edge['to']
                    })

Detailed Solution Breakdown

  1. Database Connection and DDL Extraction: Create a Python script to connect to the database and extract the DDL statements.
  2. DAG Configuration: Define a YAML or JSON file to store metadata about tables, columns, and ETL jobs.
  3. DDL Updates: Write a function to update the DDL statements based on new requirements.
  4. Data Lineage Generation: Create a function to parse the metadata and generate a lineage graph using graphviz.

Project Setup

Directory Structure:

.
├── README.md
├── dag-generator
├── dag-visualizer
├── docker-compose.yml
├── docs
└── requirements.txt

Lineage Consumption

The lineage are generated with json format with nodes and edges where node is dataset and edges is the link of connection.

Using import ForceGraph2D from 'react-force-graph-2d'; for generate the lineage of data movement.

How the nodes and edges are being visualized:

   <ForceGraph2D
      graphData={graphData}
      width={800}
      height={600}
      nodeLabel={node => node.label}
      nodeCanvasObjectMode={() => 'before'}
      nodeCanvasObject={(node, ctx, globalScale) => {
        const label = node.id;
        const fontSize = 12 / globalScale;
        ctx.font = `${fontSize}px Sans-Serif`;
        ctx.textAlign = 'center';
        ctx.textBaseline = 'middle';
        ctx.fillStyle = 'black'; // node label color
        ctx.fillText(label, node.x, node.y + fontSize);


        if (rootNodes.includes(node.id)) {
          ctx.beginPath();
          ctx.arc(node.x, node.y, 10 / globalScale, 0, 2 * Math.PI, false);
          ctx.fillStyle = 'orange';
          ctx.fill();
        }
      }}
      linkDirectionalArrowLength={6}
      linkDirectionalArrowRelPos={1}
      onNodeClick={handleNodeClick}
      linkColor={link => (highlightedEdges.includes(link.id) ? 'red' : 'gray')}
    />

Setup

Prerequisites:

  • Python 3.x
  • Node.js and npm
  • PostgreSQL database (Or any others)

  • Step#1 Clone project: checkout the project with: dag-gen, dag-viz
git clone https://github.com/lognbuivan/data-lineage-visualization.git
cd data-lineage-visualization
  • Step#2 Install dependencies: running all libraries
pip install -r requirements.txt
  • Step#3 Configure database connection and metadata: config in metadata/metadata.yaml.

Change the connection to your database

connection_string = "dbname='postgres' user='admin' password='admin' host='localhost' port='5432'"

Change the dependencies and lineage configuration

lineage:
  table1:
    step: 1
    elt_job: job1
  table2:
    step: 2
    elt_job: job2
  table3:
    step: 2
    elt_job: job3
edges:
  - from: table1
    to: table2
  - from: table2
    to: table3

  • Step#4 Run the scripts: start the the dag-gen
python src/main.py
  • Step#5 Visualize lineage: start the dag-viz
cd dag-visualizer

and run React App

npm install
npm start

Usage

Backend:

The main.py script connects to your PostgreSQL database, extracts the table schemas, and generates a JSON file (dag.json) representing the data lineage.

Frontend:

The React application (dag-visualizer) loads the dag.json file and visualizes the data lineage as a DAG. Click on any node to highlight the edges connected to that node. Root nodes (master jobs) are highlighted with a different color.

Example Data

An example dag.json file structure:

{
  "nodes": [
    {
      "id": "table1",
      "label": "table1\nStep: 1\nJob: job1",
      "data": {
        "step": 1,
        "etl_job": "job1"
      }
    },
    ...
  ],
  "edges": [
    {
      "id": "table1-table11",
      "source": "table1",
      "target": "table11"
    },
    ...
  ]
}

Conclusion

This solution provides a detailed outline for setting up a project to generate data lineage based on table DDL, including the extraction of DDL, configuration of DAGs, updating DDL, and generating the lineage chart.

Contribution

TBU…


Subscribe to keep you posted the latest updates