Load from Postgres to Postgres faster
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/postgres_to_postgres
About this Example
Huge shout out to Simon Späti for this example!
This examples shows you how to export and import data from Postgres to Postgres in a fast way with ConnectorX and DuckDB
since the default export will generate Insert_statement during the normalization phase, which is super slow for large tables.
As it's an initial load, we create a separate schema with timestamp initially and then replace the existing schema with the new one.
This approach is tested and works well for an initial load (--replace), however, the incremental load (--merge) might need some adjustments (loading of load-tables of dlt, setting up first run after an initial
load, etc.).
We'll learn:
- How to get arrow tables from connector X and yield them in chunks.
- That merge and incremental loads work with arrow tables.
- How to use DuckDB for a speedy normalization.
- How to use argparseto turn your pipeline script into a CLI.
- How to work with ConnectionStringCredentialsspec.
Be aware that you need to define the database credentials in .dlt/secrets.toml or dlt ENVs and adjust the tables names ("customers" and "inventory").
Install dlt with duckdb as extra, also connectorx, Postgres adapter and progress bar tool:
pip install "dlt[duckdb]" connectorx pyarrow psycopg2-binary alive-progress
Run the example:
python postgres_to_postgres.py --replace
Attention: There were problems with data type TIME that includes nano seconds. More details in Slack
As well as with installing DuckDB extension (see issue
here), that's why I manually installed the postgres_scanner.duckdb_extension in my Dockerfile to load the data into Postgres.
Full source code
import argparse
import os
from dlt.common import pendulum
from typing import List
import connectorx as cx
import duckdb
import psycopg2
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
CHUNKSIZE = int(
    os.getenv("CHUNKSIZE", 1000000)
)  # 1 mio rows works well with 1GiB RAM memory (if no parallelism)
def read_sql_x_chunked(conn_str: str, query: str, chunk_size: int = CHUNKSIZE):
    offset = 0
    while True:
        chunk_query = f"{query} LIMIT {chunk_size} OFFSET {offset}"
        data_chunk = cx.read_sql(
            conn_str,
            chunk_query,
            return_type="arrow",
            protocol="binary",
        )
        yield data_chunk
        if data_chunk.num_rows < chunk_size:
            break  # No more data to read
        offset += chunk_size
@dlt.source(max_table_nesting=0)
def pg_resource_chunked(
    table_name: str,
    primary_key: List[str],
    schema_name: str,
    order_date: str,
    load_type: str = "merge",
    columns: str = "*",
    credentials: ConnectionStringCredentials = None,
):
    print(
        f"dlt.resource write_disposition: `{load_type}` -- ",
        "connection string:"
        f" postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
    )
    query = (  # Needed to have an idempotent query
        f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}"
    )
    source = dlt.resource(  # type: ignore
        name=table_name,
        table_name=table_name,
        write_disposition=load_type,  # use `replace` for initial load, `merge` for incremental
        primary_key=primary_key,
        parallelized=True,
    )(read_sql_x_chunked)(
        credentials.to_native_representation(),  # Pass the connection string directly
        query,
    )
    if load_type == "merge":
        # Retrieve the last value processed for incremental loading
        source.apply_hints(incremental=dlt.sources.incremental(order_date))
    return source
def table_desc(table_name, pk, schema_name, order_date, columns="*"):
    return {
        "table_name": table_name,
        "pk": pk,
        "schema_name": schema_name,
        "order_date": order_date,
        "columns": columns,
    }
if __name__ == "__main__":
    # Input Handling
    parser = argparse.ArgumentParser(description="Run specific functions in the script.")
    parser.add_argument("--replace", action="store_true", help="Run initial load")
    parser.add_argument("--merge", action="store_true", help="Run delta load")
    args = parser.parse_args()
    source_schema_name = "fixture_postgres_to_postgres"
    target_schema_name = "destination_schema"
    pipeline_name = "loading_postgres_to_postgres"
    tables = [
        table_desc("customers", ["id"], source_schema_name, "id"),
        table_desc("inventory", ["id"], source_schema_name, "id"),
    ]
    # default is initial loading (replace)
    load_type = "merge" if args.merge else "replace"
    print(f"LOAD-TYPE: {load_type}")
    resources = []
    for table in tables:
        resources.append(
            pg_resource_chunked(
                table["table_name"],
                table["pk"],
                table["schema_name"],
                table["order_date"],
                load_type=load_type,
                columns=table["columns"],
                credentials=dlt.secrets["sources.postgres.credentials"],
            )
        )
    if load_type == "replace":
        pipeline = dlt.pipeline(
            pipeline_name=pipeline_name,
            destination="duckdb",
            dataset_name=target_schema_name,
            dev_mode=True,
            progress="alive_progress",
        )
    else:
        pipeline = dlt.pipeline(
            pipeline_name=pipeline_name,
            destination="postgres",
            dataset_name=target_schema_name,
            dev_mode=False,
        )  # dev_mode=False
    # start timer
    startTime = pendulum.now()
    # 1. extract
    print("##################################### START EXTRACT ########")
    pipeline.extract(resources, loader_file_format="parquet")
    print(f"--Time elapsed: {pendulum.now() - startTime}")
    # 2. normalize
    print("##################################### START NORMALIZATION ########")
    if load_type == "replace":
        info = pipeline.normalize(
            workers=2,
        )  # https://dlthub.com/docs/blog/dlt-arrow-loading
    else:
        info = pipeline.normalize()
    print(info)
    print(pipeline.last_trace.last_normalize_info)
    print(f"--Time elapsed: {pendulum.now() - startTime}")
    # 3. load
    print("##################################### START LOAD ########")
    load_info = pipeline.load()
    print(load_info)
    print(f"--Time elapsed: {pendulum.now() - startTime}")
    # check that stuff was loaded
    row_counts = pipeline.last_trace.last_normalize_info.row_counts
    assert row_counts["customers"] == 13
    assert row_counts["inventory"] == 3
    if load_type == "replace":
        # 4. Load DuckDB local database into Postgres
        print("##################################### START DUCKDB LOAD ########")
        # connect to local duckdb dump
        conn = duckdb.connect(f"{load_info.destination_displayable_credentials}".split(":///")[1])
        conn.sql("INSTALL postgres;")
        conn.sql("LOAD postgres;")
        # select generated timestamp schema
        timestamped_schema = conn.sql(
            f"""select distinct table_schema from information_schema.tables
                     where table_schema like '{target_schema_name}%'
                     and table_schema NOT LIKE '%_staging'
                     order by table_schema desc"""
        ).fetchone()[0]
        print(f"timestamped_schema: {timestamped_schema}")
        target_credentials = ConnectionStringCredentials(
            dlt.secrets["destination.postgres.credentials"]
        )
        # connect to destination (timestamped schema)
        conn.sql(
            "ATTACH"
            f" 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}'"
            " AS pg_db (TYPE postgres);"
        )
        conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")
        for table in tables:
            print(
                f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO"
                f" Postgres {timestamped_schema}.{table['table_name']}"
            )
            conn.sql(
                f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS"
                f" SELECT * FROM {timestamped_schema}.{table['table_name']};"
            )
            conn.sql(
                f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
            ).show()
        print(f"--Time elapsed: {pendulum.now() - startTime}")
        print("##################################### FINISHED ########")
        # check that stuff was loaded
        rows = conn.sql(
            f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
        ).fetchone()[0]
        assert int(rows) == 13 if table["table_name"] == "customers" else 3
        # 5. Cleanup and rename Schema
        print("##################################### RENAME Schema and CLEANUP ########")
        try:
            con_hd = psycopg2.connect(
                dbname=target_credentials.database,
                user=target_credentials.username,
                password=target_credentials.password,
                host=target_credentials.host,
                port=target_credentials.port,
            )
            con_hd.autocommit = True
            print(
                "Connected to HD-DB: "
                + target_credentials.host
                + ", DB: "
                + target_credentials.username
            )
        except Exception as e:
            print(f"Unable to connect to HD-database! The reason: {e}")
        with con_hd.cursor() as cur:
            # Drop existing target_schema_name
            print(f"Drop existing {target_schema_name}")
            cur.execute(f"DROP SCHEMA IF EXISTS {target_schema_name} CASCADE;")
            # Rename timestamped-target_schema_name to target_schema_name
            print(f"Going to rename schema {timestamped_schema} to {target_schema_name}")
            cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO {target_schema_name};")
        con_hd.close()