Skip to content

TODO

  • Transfer dataset.Dataset into pyarrow.parquet.ParquetDataset at https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

  • Add the compression algorithm when loading from source when write to GCS, write/load back into local disk

  • Clean data folder in Storage

  • Construct the session engine

  • Re-force created using only psycopg (version3)

  • Use resolve: PostgresDsn(url.set(driver_name="").render_as_string(hide_password=False))

  • Construct as dependencies

  • Test using HTTP for integration test

  • Config to fetch param --dest-connection-uri URL

  • Config to fetch param --service-origin-url URL [Staging, Production]

  • Test using http.post using program gcloud.auth

  • Pass command for test

  • Finished pipeline Madillo

  • Madillo: Establish the pipeline transfer data between BigQuery and Postgres

  • Syncronic: Cloud Run to execute the process of transfer using PyArrow format for read and COPY for write

  • Compare Tasks with Schedulers: https://cloud.google.com/tasks/docs/comp-tasks-sched

  • Using selectors columns instead of SELECT * FROM .. in marts on batch_upsert functions

  • Implemented COPY FROM replaced to INSERT INTO. Ref: https://stackoverflow.com/questions/69502814/postgres-copy-stream-using-pg8000-error-could-not-determine-data-type-of-para

  • Validate types of schedulers deployments with pandas series data types. Ref: https://pbpython.com/pandas_dtypes.html

  • [Internal] Upgrade transfer data model to reduced time to endpoint.

  • Transfer to COPY with binary

    • QUERY

    • Testing/Production

    • implement sqlaqlchemy data-types

    • docs

    • schedule:

    • Mapflux use-case.

  • Add supported of sqlalchemy types

  • Add supported of sqlalchemy bigquery

  • Input:

  • Performance Test:

    • [1] Total number identifier: 10 identifer

    • [2] Idenfier has multiple column types: 10 data types of query

    • [3] Test 1M - 2M - 3M - 4M, 10M, 50M, 100M record

  • MapFlux: manifest of sync and will be CLI tool for execution

    • Workflow to replace email

    • Required: Timing (start-end-duration), seperated by script.

  • Workflow: Task Declare

  • Worked in staging/production

  • Workflow: Executor

  • Slack: Template design in inno-transflow

  • Ingestion purpose: Upload output to GCS

  • UUID random <-> Headers <-> UUID[header]:Metadata[body]

  • Remove all current previous pipeline:

    • Remove Cloud Func: prod-bq2postgres-upsert
  • Update SAD for the project

  • Scan Streaming Document:

a] Documentation

b] How to apply | design | use in the workflow

c] Integrate tools with connectors

  • {'S': 'ERROR', 'V': 'ERROR', 'C': '42P16', 'M': 'cannot create temporary relation in non-temporary schema', 'P': '19', 'F': 'namespace.c', 'L': '646', 'R': 'RangeVarAdjustRelationPersistence'}

  • INSERT…ON CONFLICT (Upsert)

From Postgres Introduction

Starting with version 9.5, PostgreSQL allows “upserts” (update or insert) of rows into a table via the ON CONFLICT clause of the INSERT statement. A candidate row will only be inserted if that row does not violate any unique constraints. In the case of a unique constraint violation, a secondary action can occur which can be either “DO UPDATE”, indicating that the data in the target row should be updated, or “DO NOTHING”, which indicates to silently skip this row.

Conflicts are determined using existing unique constraints and indexes. These constraints may be identified either using their name as stated in DDL, or they may be inferred by stating the columns and conditions that comprise the indexes.

SQLAlchemy provides ON CONFLICT support via the PostgreSQL-specific insert() function, which provides the generative methods Insert.on_conflict_do_update() and Insert.on_conflict_do_nothing():

from sqlalchemy.dialects.postgresql import insert
insert_stmt = insert(my_table).values(
  id='some_existing_id',
  data='inserted value')
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
    index_elements=['id']
)
print(do_nothing_stmt)
# INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
# ON CONFLICT (id) DO NOTHING
do_update_stmt = insert_stmt.on_conflict_do_update(
  constraint='pk_my_table',
  set_=dict(data='updated value')
)
print(do_update_stmt)
# INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
# ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s