Skip to content

Streaming from Data Layer

Table of Contents:

Overview

Stream process streaming data from datalayer into the lake house.

  • The Change Data Capture (CDC) responsibility to transfer data from OLTP to OLAP.

  • In the past, we implemented CDC by using DataStreaming, but it had some problems for our company:

  • The cost is too expensive, details:

  • We don't need to stream data, we only need the batch data

  • Because we have to use a template on GCP, so:

    • We have to design SCD2 on OLTP

    • We can't customize the transfer flow

  • So we need to implement other CDC tools, that with less expensive and support batch data. The suitable tool is Airbyte with some reasons:

  • The cost is too cheap, details:

  • It supports a strong schedule feature for batching data

  • We don't need to design SCD2 on OLTP

Source: MySQL

Target: BigQuery

SAD - System Architecture Design

Logical View

The project consistent with this flow

flowchart TB

  subgraph cdc_tool[CDC Tool]
    direction TB
    cdc_tool_log[Log]
    cdc_tool_ui[UI]
  end

  subgraph oltp[OLTP]
    oltp_data[Data]
    oltp_wal[WAL]
  end

  subgraph olap[OLAP]
    olap_data[Data]
  end

  subgraph transfer_flow[Transfer Flow]
    direction LR
    oltp <-- read WAL --> cdc_tool
    cdc_tool -- write --> olap
  end

  schedule[Schedule] -- trigger --> transfer_flow

Physical View

flowchart LR
  rdms[RDMS ] --> datastream --> bigquery
flowchart TB

  subgraph airbyte[Airbyte]
    airbyte_debezium[Debezium]
  end

  cloud_sql[Cloud SQL]
  big_query[Big Query]

  subgraph transfer_flow[Transfer Flow]
    direction LR
    cloud_sql <-- read WAL --> airbyte
    airbyte -- write --> big_query
  end

  prefect[Prefect] -- trigger --> transfer_flow

Entities

Stream ID: dawnbreaker2magnus

Deployment

Deployment related to trigger when push on a specific branch

flowchart LR
    gitHub[GitHub] -- on push --> trigger[GitHub Actions] -- deploy --> cloud_compute[Cloud compute]

Set up - Service Account

The service account is required to access:

[1] Cloud SQL: Read WAL

[2] Cloud GCS: Upload WAL

[3] BigQuery: Upload log and data

To run the tests against your bucket, make sure to set up a Service Account with all necessary permissions:

  • compute.instanceAdmin.v1

Set up - Compute Engine

  • Zone: asia-southeast1-b
  • Machine type: e2-medium
  • Image: Debian GNU/Linux 11 (bullseye)
  • Ram: 4GB
  • vCPU: 2
  • Disk: 30GB

Set up - System Dependencies

Services Description Note
docker Docker daemon
docker-compose Compose multiple services from Docker file
htop A cross-platform interactive process viewer

Set up - Environment variables

  • Airbyte image version:
  • VERSION
  • When using the airbyte-db via default docker image
  • CONFIG_ROOT
  • DATA_DOCKER_MOUNT
  • DB_DOCKER_MOUNT
  • Workspace storage for running jobs (logs, etc)
  • WORKSPACE_ROOT
  • WORKSPACE_DOCKER_MOUNT
  • Local mount to access local files from the filesystem
  • LOCAL_ROOT
  • LOCAL_DOCKER_MOUNT
  • HACK_LOCAL_ROOT_PARENT
  • Proxy Configuration (set to empty values, e.g. "" to disable basic auth)
  • BASIC_AUTH_USERNAME
  • BASIC_AUTH_PASSWORD
  • BASIC_AUTH_PROXY_TIMEOUT
  • Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db
  • DATABASE_USER
  • DATABASE_PASSWORD
  • DATABASE_HOST
  • DATABASE_PORT
  • DATABASE_DB
  • Translate manually DATABASE_URL=jdbc:postgresql://\({DATABASE_HOST}:\) (do not include the username or password here)}/${DATABASE_DB
  • DATABASE_URL
  • JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
  • Airbyte Internal Config Database, defaults to Job Database if empty. Explicitly left empty to mute docker-compose warnings.
  • CONFIG_DATABASE_USER
  • CONFIG_DATABASE_PASSWORD
  • CONFIG_DATABASE_URL
  • CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
  • Airbyte services
  • TEMPORAL_HOST
  • INTERNAL_API_HOST
  • INTERNAL_API_URL
  • CONNECTOR_BUILDER_API_HOST
  • WEBAPP_URL
  • Although not present as an env var, required for web app configuration.
  • CONNECTOR_BUILDER_API_URL
  • AIRBYTE_API_HOST
  • Jobs - Relevant to scaling.
  • SYNC_JOB_MAX_ATTEMPTS
  • SYNC_JOB_MAX_TIMEOUT_DAYS
  • SYNC_JOB_INIT_RETRY_TIMEOUT_MINUTES
  • JOB_MAIN_CONTAINER_CPU_REQUEST
  • JOB_MAIN_CONTAINER_CPU_LIMIT
  • JOB_MAIN_CONTAINER_MEMORY_REQUEST
  • JOB_MAIN_CONTAINER_MEMORY_LIMIT
  • NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT
  • NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST
  • NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT
  • NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST
  • Logging/monitoring/tracking
  • TRACKING_STRATEGY
  • JOB_ERROR_REPORTING_STRATEGY
  • LOG_LEVEL
  • Application
  • WORKERS_MICRONAUT_ENVIRONMENTS
  • CRON_MICRONAUT_ENVIRONMENTS
  • MAX_SYNC_WORKERS
  • MAX_SPEC_WORKERS
  • MAX_CHECK_WORKERS
  • MAX_DISCOVER_WORKERS
  • MAX_NOTIFY_WORKERS
  • SHOULD_RUN_NOTIFY_WORKFLOWS
  • Temporal Activity configuration
  • ACTIVITY_MAX_ATTEMPT
  • ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
  • ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
  • WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
  • Feature flags
  • AUTO_DISABLE_FAILING_CONNECTIONS
  • FEATURE_FLAG_CLIENT
  • Monitoring flags
  • PUBLISH_METRICS
  • Accepted values are data dog and otel (open telemetry)
  • METRIC_CLIENT
  • Useful only when metric client is set to be otel. Must start with http:// or https://.
  • OTEL_COLLECTOR_ENDPOINT
  • Useful only when meetric client is set to be datadog.
  • DD_AGENT_HOST
  • DD_DOGSTATSD_PORT
  • AUTO_DETECT_SCHEMA
  • SECRET_PERSISTENCE=TESTING_CONFIG_DB_TABLE
  • To test local catalog changes, set the below variable to the path of your local catalog.
  • LOCAL_CONNECTOR_CATALOG_PATH

Follow GitHub Actions Deployment From zellwk

Configure a Cloud SQL for MySQL database Enable binary logging To enable binary logging for Cloud SQL for MySQL, see Enabling point-in-time recovery. Create a Datastream user To create a Datastream user, enter the following MySQL commands:

CREATE USER 'datastream'@'%' IDENTIFIED BY '[YOUR_PASSWORD]'; GRANT REPLICATION SLAVE, SELECT, REPLICATION CLIENT ON . TO 'datastream'@'%'; FLUSH PRIVILEGES;

The idea of this container pipeline are:

(a) seperated the READ/WRITE for targeted database RDMS into Columnar format (BigQuery)

(b) Apply the SCD Type 2 for targeted database - History will be added as a new row.

See the documentation at: https://www.sqlshack.com/implementing-slowly-changing-dimensions-scds-in-data-warehouses/

Source Reference

https://cloud.google.com/datastream/docs/implementing-datastream-dataflow-analytics

https://cloud.google.com/datastream/docs/configure-your-source-mysql-database

https://cloud.google.com/datastream/docs/sources-mysql#mysqlknownlimitations

The full documentation for the pipeline available at Pipeline in Confluent