Skip to content

Madillo - Syncronic

Overview

Madillo is the internal pipeline that contain HTTP server to perform replicate data from BigQuery to CloudSQL (PostgreSQL: Silencer)

Madillo included 3 parts:

  • Syncronic is an engine to sync between database in memory from a source into RDMS

  • MapFlux is a configuration CLI to interface with the syncronic without of Cloud Task

Feature:

  • Transfer fast using Arrow backend transfer

  • Can handle data-type between Postgres and BigQuery convervation.

  • Notification the progress of handle

  • Handle Index

  • More enforce for the manifest instruction

  • Authentication progress

  • Handle multiple data types with convention: Pyarrow -> Postgres -> Pandas -> Polars

  • Handle both QUERY and TABLE as identifie

  • Using COPY as the backend to increase

  • Handle MERGE method

SAD - System Architechture Design

Logical design

  sequenceDiagram

  Client ->> Service: Send manifest
  alt Instruction format is TABLE
      Service->>Bigquery: Using arrowParquet to extract by ARROW
      Bigquery->>Storage: Send data
      Storage->>Service: Load data back
  else Instruction format is QUERY
      Service->>Bigquery: Using BQ RestAPI to fetch
      Bigquery->>Service: Load in  memory
  end

  loop Load data into memory
      Service->>Service: Sink data into local disk
  end

  alt Transfer the data from local
        Service-->> RDMS: Using COPY FROM into temporary table
        RDMS->>RDMS: Load back to original targeted table by INSERT INTO
  end

  Service-->>Client: Result of the fetch

Goal: Transfer BQ table -> Postgres = phuoưng pháp pushdown

1M -> download 1M -> push 1M (đơn giản) -> overload memory

1M -> download small ([folder structrure hadoop])

Polars strategy: Read until met memory

1] [Bigquery]

Fetch metadata of the table

SELECT * FROM

LIMIT 0

-> Structure table

-> Mapping [BQ types] -> [Postgres Type]

2] Thực hiện download theo ADBC (Arrow format)

-> Push folder (ConnectorX) (polars)

3] Upload using table

Design endpoint: POST /v3/sync

Expectation:

[1] Low footprint memory ~500Mi

[2] Runtime: ~15s 1M records.

10M ~ 2min

Tech Stack

  • API Framework: FastAPI

  • Database: BigQuery, RDMS

  • Serverless: Cloud Run

  • Rate Limit: Cloud Task

  • Logging, Error: Cloud Logging, Error Reporting

  • Programming: Python

  • CloudBuild

  • CloudTask

  • Database: BigQuery, RDMS Postgres

  • Local development: Docker

  • Connector:

  • Use GCP Client to connect to services such as BigQuery, and Cloud Logging.

  • Cloud Build: CICD for deploy Run, deploy workflow to create task (handle rate-limit) and deploy workflow execute Syncronic

Physical design

flowchart LR
    bg[BigQuery] <--> service[Cloud Run service]
    service <--> storage[Cloud Storate]
    service <--> rdms[Database RDMS]

Workflow task model

flowchart LR
# <Cloud Storage> <Workflow> <Task> <Workflow>

# ++++++++++++++ <Pubsub> ++++++++++++++++++ +++++++++++ ++++++++++++++

# + Storage + -- Eventarc --> + Task Declare + --> + Queue + ----> + Executor +

# ++++++++++++++ ++++++++++++++++++ +++++++++++ ++++++++++++++
  • Verify source identifiers and destination identifiers.

  • Dữ liệu phải nằm bên Postgres ---> Transfer → Refactor ---> [TASK] [PARQUET Download polars read -> CSV execute]

[1800 * 1000 * 4 * 10] <<< Tối ưu di chuyển database

Permission

  • Database runner: runner_pugna

  • Service Account: sa-pugna

  • Read in BQ dataset

  • Write/delete list of objects in GCS

Name Permission
sa-medusa + Remove perms:
- roles/cloudtasks.enqueuer
+ Add perms:
- roles/secretmanager.secretAccessor
- roles/monitoring.metricWriter
- roles/workflows.invoker
- roles/workflows.viewer
- roles/serviceusage.serviceUsageConsumer
- roles/serviceusage.serviceUsageViewer
sa-razor + Add perms:
- roles/cloudtasks.enqueuer
- roles/workflows.invoker
- roles/iam.serviceAccountUser

Suported Type

Type

Mapping of types in queries The tool queries the ODBC Data source for type information and maps it to parquet type as such:

ODBC SQL Type Parquet Type Decimal(p < 39, s) Decimal(p,s) Numeric(p < 39, s) Decimal(p,s) Bit Boolean Double Double Real Float Float(p: 0..24) Float Float(p >= 25) Double Tiny Integer Int8 Small Integer Int16 Integer Int32 Big Int Int64 Date Date Time(p: 0..3)_ Time Milliseconds Time(p: 4..6)_ Time Microseconds Time(p: 7..9)* Time Nanoseconds Timestamp(p: 0..3) Timestamp Milliseconds Timestamp(p >= 4) Timestamp Microseconds Datetimeoffset(p: 0..3) Timestamp Milliseconds (UTC) Datetimeoffset(p >= 4) Timestamp Microseconds (UTC) Varbinary Byte Array Long Varbinary Byte Array Binary Fixed Length Byte Array All others Utf8 Byte Array p is short for precision. s is short for scale. Intervals are inclusive.

https://googleapis.dev/python/sqlalchemy-bigquery/latest/struct.html

Expected the total runtime to reduced to 10min for various pipeline and can be handle multiple resource

  • Merge strategy statements in Postgres with two methods:

  • full-refresh: Truncate data in the destination and then push full data from the source into the destination.

  • merge-strategy: Update data at the destination when having a change to the data at the source.

We had two method to transfer data to Silencer are schedulers, workflows. It have the different name and method inside but have the same usage.

Example for ETL-workflow-morphling-snapshot-by-schedulers:

example-excute-ETL-by-schedulers

Example for ETL-workflow-morphling-snapshot-by-workflows:

example-excute-ETL-by-workflows

POST /v1/sync/

Usually this is done in this way:

create a temporary table with the same structure as the destination one but without constraints, copy data to the temporary table with COPY command, copy rows that do fulfill constraints from temp table to the destination one, using INSERT command with conditions in the WHERE clause based on the table constraint, drop the temporary table. When dealing with really large CSV files or very limited server resources, use the extension file_fdw instead of temporary tables. It's much more efficient way but it requires server access to a CSV file (while copying to a temporary table can be done over the network).

In Postgres 12 you can use the WHERE clause in COPY FROM.

..

[1] Export BigQuery ~> Export to GCS in PARQUET format

[2] Polars -> pl.scan_parquet() ~> batch = 100_000

[4] FULL_REFRESH: ~ Create temp table -> SEND lên

[3] Load temp COPY command psycopg3 load Postgres

[4] Sync from temp COPY FROM