Madillo - Syncronic¶
Overview¶
Madillo is the internal pipeline that contain HTTP server to perform replicate data from BigQuery to CloudSQL (PostgreSQL: Silencer)
Madillo included 3 parts:
-
Syncronic is an engine to sync between database in memory from a source into RDMS
-
MapFlux is a configuration CLI to interface with the syncronic without of Cloud Task
Feature:
-
Transfer fast using Arrow backend transfer
-
Can handle data-type between Postgres and BigQuery convervation.
-
Notification the progress of handle
-
Handle Index
-
More enforce for the manifest instruction
-
Authentication progress
-
Handle multiple data types with convention: Pyarrow -> Postgres -> Pandas -> Polars
-
Handle both QUERY and TABLE as identifie
-
Using
COPY
as the backend to increase -
Handle MERGE method
SAD - System Architechture Design¶
Logical design¶
sequenceDiagram
Client ->> Service: Send manifest
alt Instruction format is TABLE
Service->>Bigquery: Using arrowParquet to extract by ARROW
Bigquery->>Storage: Send data
Storage->>Service: Load data back
else Instruction format is QUERY
Service->>Bigquery: Using BQ RestAPI to fetch
Bigquery->>Service: Load in memory
end
loop Load data into memory
Service->>Service: Sink data into local disk
end
alt Transfer the data from local
Service-->> RDMS: Using COPY FROM into temporary table
RDMS->>RDMS: Load back to original targeted table by INSERT INTO
end
Service-->>Client: Result of the fetch
Goal: Transfer BQ table -> Postgres = phuoưng pháp pushdown
1M -> download 1M -> push 1M (đơn giản) -> overload memory
1M -> download small ([folder structrure hadoop])
Polars strategy: Read until met memory
1] [Bigquery]
Fetch metadata of the table
SELECT * FROM
Name | Permission |
---|---|
sa-medusa | + Remove perms: |
- roles/cloudtasks.enqueuer | |
+ Add perms: | |
- roles/secretmanager.secretAccessor | |
- roles/monitoring.metricWriter | |
- roles/workflows.invoker | |
- roles/workflows.viewer | |
- roles/serviceusage.serviceUsageConsumer | |
- roles/serviceusage.serviceUsageViewer | |
sa-razor | + Add perms: |
- roles/cloudtasks.enqueuer | |
- roles/workflows.invoker | |
- roles/iam.serviceAccountUser |
Suported Type¶
Type
Mapping of types in queries The tool queries the ODBC Data source for type information and maps it to parquet type as such:
ODBC SQL Type Parquet Type Decimal(p < 39, s) Decimal(p,s) Numeric(p < 39, s) Decimal(p,s) Bit Boolean Double Double Real Float Float(p: 0..24) Float Float(p >= 25) Double Tiny Integer Int8 Small Integer Int16 Integer Int32 Big Int Int64 Date Date Time(p: 0..3)_ Time Milliseconds Time(p: 4..6)_ Time Microseconds Time(p: 7..9)* Time Nanoseconds Timestamp(p: 0..3) Timestamp Milliseconds Timestamp(p >= 4) Timestamp Microseconds Datetimeoffset(p: 0..3) Timestamp Milliseconds (UTC) Datetimeoffset(p >= 4) Timestamp Microseconds (UTC) Varbinary Byte Array Long Varbinary Byte Array Binary Fixed Length Byte Array All others Utf8 Byte Array p is short for precision. s is short for scale. Intervals are inclusive.
https://googleapis.dev/python/sqlalchemy-bigquery/latest/struct.html
Expected the total runtime to reduced to 10min for various pipeline and can be handle multiple resource
-
Merge strategy statements in Postgres with two methods:
-
full-refresh: Truncate data in the destination and then push full data from the source into the destination.
- merge-strategy: Update data at the destination when having a change to the data at the source.
We had two method to transfer data to Silencer are schedulers, workflows. It have the different name and method inside but have the same usage.
Example for ETL-workflow-morphling-snapshot-by-schedulers:
Example for ETL-workflow-morphling-snapshot-by-workflows:
POST /v1/sync/
Usually this is done in this way:
create a temporary table with the same structure as the destination one but without constraints, copy data to the temporary table with COPY command, copy rows that do fulfill constraints from temp table to the destination one, using INSERT command with conditions in the WHERE clause based on the table constraint, drop the temporary table. When dealing with really large CSV files or very limited server resources, use the extension file_fdw instead of temporary tables. It's much more efficient way but it requires server access to a CSV file (while copying to a temporary table can be done over the network).
In Postgres 12 you can use the WHERE clause in COPY FROM.