Generating Data Lineage with Database Tables DDL (DAG Gen)

In legacy systems, viewing data lineage and monitoring data movement can be a significant challenge. To address this pain point, I’ve developed a Proof of Concept (POC) aimed at resolving these issues.

With this POC, you can easily scan the database, configure dependencies, and automatically generate data lineage.

This project provides a solution for generating and visualizing data lineage based on table DDLs and ETL job metadata. The solution includes a Python script for generating data lineage information and a React application for visualizing the data lineage as a Directed Acyclic Graph (DAG).

How to create the data lineage ?

  • Method 1: Lineage by parsing, It automatically reads the logic used to process data.
  • Method 2: Lineage by data tagging, Transformation engine tags data that transforms or moves.
  • Method 3: Pattern-based lineage, It uses patterns to perform lineage instead of dealing with the code that transforms the data. It relies on metadata to create a lineage by looking for patterns.

Lineage Generation

Workflow: Read DDL → Config DAG and Dependencies → Update DDL → Generate DAG Diagram

Step 1: Read DDL

Extract DDL Information

  1. Setup a Database Connection: Establish a connection to the database where the DDL is stored. Use libraries like
# psycopg2 for PostgreSQL,
# pyodbc for SQL Server,
# cx_Oracle for Oracle,
# mysql-connector-python for MySQL.
  1. Fetch DDL Statements: Retrieve the DDL statements from the database. This can be done by querying the database system tables or using database-specific commands.
import psycopg2

def fetch_ddl(connection_string):
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()
    cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
    tables = cursor.fetchall()

    ddl_statements = {}
    for table in tables:
        cursor.execute(f"SELECT pg_get_tabledef({table[0]})")
        ddl_statements[table[0]] = cursor.fetchone()[0]

    cursor.close()
    conn.close()

    return ddl_statements

Step 2: Configure DAG and Dependencies

In order to monitor Data Runtime and Job Runtime on the same time, start

  1. Define Metadata: Create metadata to describe the tables, columns, and relationships.
  2. Define ETL Jobs: Identify and define the ETL jobs that affect the tables.
lineage:
  table1:
    step: 1
    elt_job: job1
  table2:
    step: 2
    elt_job: job2
  table3:
    step: 2
    elt_job: job3
edges:
  - from: table1
    to: table2
  - from: table2
    to: table3

Step 3: Update the DDL of Tables

Modify DDL Statements

  1. Modify Columns or Add Constraints: Update the DDL statements if necessary (e.g., adding constraints, indices).
def update_ddl(ddl_statements, updates):
    for table, update in updates.items():
        if table in ddl_statements:
            ddl_statements[table] += '\n' + update
    return ddl_statements

Step 4: Generate the Lineage with Chart

Generate Data Lineage

  1. Parse DDL Statements: Extract table relationships and dependencies from the DDL statements.
  2. Create Lineage Graph: Use libraries like graphviz or networkx to generate the lineage graph.
def export_dag_json(metadata, filename='dag.json'):
    dag_data = {"nodes": [], "edges": []}

    for table, details in metadata['lineage'].items():
        step = details.get('step', 'unknown')
        etl_job = details.get('elt_job', 'unknown')
        dag_data["nodes"].append({
            "id": table,
            "label": f"{table}\nStep: {step}\nJob: {etl_job}",
            "data": {
                "step": step,
                "etl_job": etl_job
            }
        })

        if 'edges' in metadata:
            for edge in metadata['edges']:
                if edge['from'] == table:
                    dag_data["edges"].append({
                        "id": f"{edge['from']}-{edge['to']}",
                        "source": edge['from'],
                        "target": edge['to']
                    })

Detailed Solution Breakdown

  1. Database Connection and DDL Extraction: Create a Python script to connect to the database and extract the DDL statements.
  2. DAG Configuration: Define a YAML or JSON file to store metadata about tables, columns, and ETL jobs.
  3. DDL Updates: Write a function to update the DDL statements based on new requirements.
  4. Data Lineage Generation: Create a function to parse the metadata and generate a lineage graph using graphviz.

Project Setup

Directory Structure:

.
├── README.md
├── dag-generator
├── dag-visualizer
├── docker-compose.yml
├── docs
└── requirements.txt

Lineage Consumption

The lineage are generated with json format with nodes and edges where node is dataset and edges is the link of connection.

Using import ForceGraph2D from 'react-force-graph-2d'; for generate the lineage of data movement.

How the nodes and edges are being visualized:

   <ForceGraph2D
      graphData={graphData}
      width={800}
      height={600}
      nodeLabel={node => node.label}
      nodeCanvasObjectMode={() => 'before'}
      nodeCanvasObject={(node, ctx, globalScale) => {
        const label = node.id;
        const fontSize = 12 / globalScale;
        ctx.font = `${fontSize}px Sans-Serif`;
        ctx.textAlign = 'center';
        ctx.textBaseline = 'middle';
        ctx.fillStyle = 'black'; // node label color
        ctx.fillText(label, node.x, node.y + fontSize);


        if (rootNodes.includes(node.id)) {
          ctx.beginPath();
          ctx.arc(node.x, node.y, 10 / globalScale, 0, 2 * Math.PI, false);
          ctx.fillStyle = 'orange';
          ctx.fill();
        }
      }}
      linkDirectionalArrowLength={6}
      linkDirectionalArrowRelPos={1}
      onNodeClick={handleNodeClick}
      linkColor={link => (highlightedEdges.includes(link.id) ? 'red' : 'gray')}
    />

Setup

Prerequisites:

  • Python 3.x
  • Node.js and npm
  • PostgreSQL database (Or any others)

  • Step#1 Clone project: checkout the project with: dag-gen, dag-viz
git clone https://github.com/lognbuivan/data-lineage-visualization.git
cd data-lineage-visualization
  • Step#2 Install dependencies: running all libraries
pip install -r requirements.txt
  • Step#3 Configure database connection and metadata: config in metadata/metadata.yaml.

Change the connection to your database

connection_string = "dbname='postgres' user='admin' password='admin' host='localhost' port='5432'"

Change the dependencies and lineage configuration

lineage:
  table1:
    step: 1
    elt_job: job1
  table2:
    step: 2
    elt_job: job2
  table3:
    step: 2
    elt_job: job3
edges:
  - from: table1
    to: table2
  - from: table2
    to: table3

  • Step#4 Run the scripts: start the the dag-gen
python src/main.py
  • Step#5 Visualize lineage: start the dag-viz
cd dag-visualizer

and run React App

npm install
npm start

Usage

Backend:

The main.py script connects to your PostgreSQL database, extracts the table schemas, and generates a JSON file (dag.json) representing the data lineage.

Frontend:

The React application (dag-visualizer) loads the dag.json file and visualizes the data lineage as a DAG. Click on any node to highlight the edges connected to that node. Root nodes (master jobs) are highlighted with a different color.

Example Data

An example dag.json file structure:

{
  "nodes": [
    {
      "id": "table1",
      "label": "table1\nStep: 1\nJob: job1",
      "data": {
        "step": 1,
        "etl_job": "job1"
      }
    },
    ...
  ],
  "edges": [
    {
      "id": "table1-table11",
      "source": "table1",
      "target": "table11"
    },
    ...
  ]
}

Conclusion

This solution provides a detailed outline for setting up a project to generate data lineage based on table DDL, including the extraction of DDL, configuration of DAGs, updating DDL, and generating the lineage chart.

Contribution

TBU…