TODO¶
-
Transfer
dataset.Dataset
intopyarrow.parquet.ParquetDataset
at https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html -
Add the compression algorithm when loading from source when write to GCS, write/load back into local disk
-
Clean data folder in Storage
-
Construct the session engine
-
Re-force created using only
psycopg
(version3) -
Use resolve:
PostgresDsn(url.set(driver_name="").render_as_string(hide_password=False))
-
Construct as dependencies
-
Test using HTTP for integration test
-
Config to fetch param
--dest-connection-uri URL
-
Config to fetch param
--service-origin-url URL
[Staging, Production] -
Test using
http.post
using programgcloud.auth
-
Pass command for test
-
Finished pipeline Madillo
-
Madillo: Establish the pipeline transfer data between BigQuery and Postgres
-
Syncronic: Cloud Run to execute the process of transfer using PyArrow format for read and
COPY
for write -
Compare Tasks with Schedulers: https://cloud.google.com/tasks/docs/comp-tasks-sched
-
Using selectors columns instead of
SELECT * FROM ..
in marts onbatch_upsert
functions -
Implemented
COPY FROM
replaced toINSERT INTO
. Ref: https://stackoverflow.com/questions/69502814/postgres-copy-stream-using-pg8000-error-could-not-determine-data-type-of-para -
Validate types of schedulers deployments with pandas series data types. Ref: https://pbpython.com/pandas_dtypes.html
-
[Internal] Upgrade transfer data model to reduced time to endpoint.
-
Transfer to COPY with binary
-
QUERY
-
Testing/Production
-
implement sqlaqlchemy data-types
-
docs
-
schedule:
-
Mapflux use-case.
-
-
Add supported of sqlalchemy types
-
Add supported of sqlalchemy bigquery
-
Input:
- identifier:
TABLE:<PROJECT_ID>.<DATASET>.<table_name>
-
identifier:
QUERY:SELECT * FROM <PROJECT_ID>.<DATASET>.<table_name>
-
identifier validate:
* "" . ; : `` - \_ ()
-
identifier syntax: "TABLE:METADATA"
-
In case: https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
- identifier:
-
Performance Test:
-
[1] Total number identifier: 10 identifer
-
[2] Idenfier has multiple column types: 10 data types of query
-
[3] Test 1M - 2M - 3M - 4M, 10M, 50M, 100M record
-
-
MapFlux: manifest of sync and will be CLI tool for execution
-
Workflow to replace email
-
Required: Timing (start-end-duration), seperated by script.
-
-
Workflow: Task Declare
-
Worked in staging/production
-
Workflow: Executor
-
Slack: Template design in
inno-transflow
-
Ingestion purpose: Upload output to GCS
-
UUID random <-> Headers <-> UUID[header]:Metadata[body]
-
Remove all current previous pipeline:
- Remove Cloud Func: prod-bq2postgres-upsert
-
Update SAD for the project
-
Scan Streaming Document:
a] Documentation
b] How to apply | design | use in the workflow
c] Integrate tools with connectors
-
{'S': 'ERROR', 'V': 'ERROR', 'C': '42P16', 'M': 'cannot create temporary relation in non-temporary schema', 'P': '19', 'F': 'namespace.c', 'L': '646', 'R': 'RangeVarAdjustRelationPersistence'}
-
INSERT…ON CONFLICT (Upsert)
Starting with version 9.5, PostgreSQL allows “upserts” (update or insert) of rows into a table via the ON CONFLICT clause of the INSERT statement. A candidate row will only be inserted if that row does not violate any unique constraints. In the case of a unique constraint violation, a secondary action can occur which can be either “DO UPDATE”, indicating that the data in the target row should be updated, or “DO NOTHING”, which indicates to silently skip this row.
Conflicts are determined using existing unique constraints and indexes. These constraints may be identified either using their name as stated in DDL, or they may be inferred by stating the columns and conditions that comprise the indexes.
SQLAlchemy provides ON CONFLICT support via the PostgreSQL-specific insert() function, which provides the generative methods Insert.on_conflict_do_update() and Insert.on_conflict_do_nothing():
from sqlalchemy.dialects.postgresql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=['id']
)
print(do_nothing_stmt)
# INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
# ON CONFLICT (id) DO NOTHING
do_update_stmt = insert_stmt.on_conflict_do_update(
constraint='pk_my_table',
set_=dict(data='updated value')
)
print(do_update_stmt)
# INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
# ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s