Streaming from Data Layer¶
Table of Contents:
Overview¶
Stream process streaming data from datalayer into the lake house.
-
The Change Data Capture (CDC) responsibility to transfer data from OLTP to OLAP.
-
In the past, we implemented CDC by using DataStreaming, but it had some problems for our company:
-
The cost is too expensive, details:
-
We don't need to stream data, we only need the batch data
-
Because we have to use a template on GCP, so:
-
We have to design SCD2 on OLTP
-
We can't customize the transfer flow
-
-
So we need to implement other CDC tools, that with less expensive and support batch data. The suitable tool is Airbyte with some reasons:
-
The cost is too cheap, details:
-
It supports a strong schedule feature for batching data
-
We don't need to design SCD2 on OLTP
Source: MySQL
Target: BigQuery
SAD - System Architecture Design¶
Logical View¶
The project consistent with this flow
flowchart TB
subgraph cdc_tool[CDC Tool]
direction TB
cdc_tool_log[Log]
cdc_tool_ui[UI]
end
subgraph oltp[OLTP]
oltp_data[Data]
oltp_wal[WAL]
end
subgraph olap[OLAP]
olap_data[Data]
end
subgraph transfer_flow[Transfer Flow]
direction LR
oltp <-- read WAL --> cdc_tool
cdc_tool -- write --> olap
end
schedule[Schedule] -- trigger --> transfer_flow
Physical View¶
flowchart LR
rdms[RDMS ] --> datastream --> bigquery
flowchart TB
subgraph airbyte[Airbyte]
airbyte_debezium[Debezium]
end
cloud_sql[Cloud SQL]
big_query[Big Query]
subgraph transfer_flow[Transfer Flow]
direction LR
cloud_sql <-- read WAL --> airbyte
airbyte -- write --> big_query
end
prefect[Prefect] -- trigger --> transfer_flow
Entities¶
Stream ID: dawnbreaker2magnus
Deployment¶
Deployment related to trigger when push on a specific branch
flowchart LR
gitHub[GitHub] -- on push --> trigger[GitHub Actions] -- deploy --> cloud_compute[Cloud compute]
Set up - Service Account¶
The service account is required to access:
[1] Cloud SQL: Read WAL
[2] Cloud GCS: Upload WAL
[3] BigQuery: Upload log and data
To run the tests against your bucket, make sure to set up a Service Account with all necessary permissions:
- compute.instanceAdmin.v1
Set up - Compute Engine¶
- Zone: asia-southeast1-b
- Machine type: e2-medium
- Image: Debian GNU/Linux 11 (bullseye)
- Ram: 4GB
- vCPU: 2
- Disk: 30GB
Set up - System Dependencies¶
Services | Description | Note |
---|---|---|
docker | Docker daemon | |
docker-compose | Compose multiple services from Docker file | |
htop | A cross-platform interactive process viewer |
Set up - Environment variables¶
- Airbyte image version:
- VERSION
- When using the airbyte-db via default docker image
- CONFIG_ROOT
- DATA_DOCKER_MOUNT
- DB_DOCKER_MOUNT
- Workspace storage for running jobs (logs, etc)
- WORKSPACE_ROOT
- WORKSPACE_DOCKER_MOUNT
- Local mount to access local files from the filesystem
- LOCAL_ROOT
- LOCAL_DOCKER_MOUNT
- HACK_LOCAL_ROOT_PARENT
- Proxy Configuration (set to empty values, e.g. "" to disable basic auth)
- BASIC_AUTH_USERNAME
- BASIC_AUTH_PASSWORD
- BASIC_AUTH_PROXY_TIMEOUT
- Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db
- DATABASE_USER
- DATABASE_PASSWORD
- DATABASE_HOST
- DATABASE_PORT
- DATABASE_DB
- Translate manually DATABASE_URL=jdbc:postgresql://\({DATABASE_HOST}:\) (do not include the username or password here)}/${DATABASE_DB
- DATABASE_URL
- JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
- Airbyte Internal Config Database, defaults to Job Database if empty. Explicitly left empty to mute docker-compose warnings.
- CONFIG_DATABASE_USER
- CONFIG_DATABASE_PASSWORD
- CONFIG_DATABASE_URL
- CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
- Airbyte services
- TEMPORAL_HOST
- INTERNAL_API_HOST
- INTERNAL_API_URL
- CONNECTOR_BUILDER_API_HOST
- WEBAPP_URL
- Although not present as an env var, required for web app configuration.
- CONNECTOR_BUILDER_API_URL
- AIRBYTE_API_HOST
- Jobs - Relevant to scaling.
- SYNC_JOB_MAX_ATTEMPTS
- SYNC_JOB_MAX_TIMEOUT_DAYS
- SYNC_JOB_INIT_RETRY_TIMEOUT_MINUTES
- JOB_MAIN_CONTAINER_CPU_REQUEST
- JOB_MAIN_CONTAINER_CPU_LIMIT
- JOB_MAIN_CONTAINER_MEMORY_REQUEST
- JOB_MAIN_CONTAINER_MEMORY_LIMIT
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST
- Logging/monitoring/tracking
- TRACKING_STRATEGY
- JOB_ERROR_REPORTING_STRATEGY
- LOG_LEVEL
- Application
- WORKERS_MICRONAUT_ENVIRONMENTS
- CRON_MICRONAUT_ENVIRONMENTS
- MAX_SYNC_WORKERS
- MAX_SPEC_WORKERS
- MAX_CHECK_WORKERS
- MAX_DISCOVER_WORKERS
- MAX_NOTIFY_WORKERS
- SHOULD_RUN_NOTIFY_WORKFLOWS
- Temporal Activity configuration
- ACTIVITY_MAX_ATTEMPT
- ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
- ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
- WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
- Feature flags
- AUTO_DISABLE_FAILING_CONNECTIONS
- FEATURE_FLAG_CLIENT
- Monitoring flags
- PUBLISH_METRICS
- Accepted values are data dog and otel (open telemetry)
- METRIC_CLIENT
- Useful only when metric client is set to be otel. Must start with http:// or https://.
- OTEL_COLLECTOR_ENDPOINT
- Useful only when meetric client is set to be datadog.
- DD_AGENT_HOST
- DD_DOGSTATSD_PORT
- AUTO_DETECT_SCHEMA
- SECRET_PERSISTENCE=TESTING_CONFIG_DB_TABLE
- To test local catalog changes, set the below variable to the path of your local catalog.
- LOCAL_CONNECTOR_CATALOG_PATH
Follow GitHub Actions Deployment From zellwk
Configure a Cloud SQL for MySQL database Enable binary logging To enable binary logging for Cloud SQL for MySQL, see Enabling point-in-time recovery. Create a Datastream user To create a Datastream user, enter the following MySQL commands:
CREATE USER 'datastream'@'%' IDENTIFIED BY '[YOUR_PASSWORD]'; GRANT REPLICATION SLAVE, SELECT, REPLICATION CLIENT ON . TO 'datastream'@'%'; FLUSH PRIVILEGES;
The idea of this container pipeline are:
(a) seperated the READ/WRITE for targeted database RDMS into Columnar format (BigQuery)
(b) Apply the SCD Type 2 for targeted database - History will be added as a new row.
See the documentation at: https://www.sqlshack.com/implementing-slowly-changing-dimensions-scds-in-data-warehouses/
Source Reference¶
https://cloud.google.com/datastream/docs/implementing-datastream-dataflow-analytics
https://cloud.google.com/datastream/docs/configure-your-source-mysql-database
https://cloud.google.com/datastream/docs/sources-mysql#mysqlknownlimitations
The full documentation for the pipeline available at Pipeline in Confluent