Pipeline: Raptor - workflow for exchange listener¶
Overview¶
Push workflow to cloud task for schedule.
Pipeline¶
List of pipelines:
Pipeline | Description | Status |
---|---|---|
Raptor | Listen exchange report and pull data element from pdf types | Deployed |
Baryonyx | Collect financial statement elements with parquest, polars | Deployed |
note: Updated at 2023-07-12 by @bao.truong
# Variable
declare ENVIRONMENT_CODE=staging
declare BUCKET_NAME=inno-processor-staging
declare PUBSUB_TOPIC=staging-raptor-shipper
declare WORKFLOW_TASK_DECLARE_NAME=staging-raptor-task-declare
declare WORKFLOW_TASK_DECLARE_DESCRIPTION='[Raptor] Staging - Create worker to declare task to hub'
declare WORKFLOW_TASK_DECLARE_YAML_FILE=workflow/staging_task_dispatch_pipeline_raptor.yaml
declare EVENTARC_TASK_DECLARE_SUBSCRIPTION=staging-raptor-transfer
declare WORKFLOW_EXECUTOR_NAME=staging-raptor-executor
declare WORKFLOW_EXECUTOR_DESCRIPTION="[Raptor] Staging - Execute processor pipeline"
declare WORKFLOW_EXECUTOR_YAML_FILE=workflow/staging_executor_raptor.yaml
System Achitechture Design¶
For the logic view, there are
flowchart LR
%% Component
subgraph gcp[Google Cloud Platform]
direction TB
subgraph server[Server]
direction LR
subgraph workflow_task
direction LR
workflow
app[ASGI] --> notification
end
end
rate_limiter
end
%% Flow
rate_limiter --> workflow
client -- request --> rate_limiter
app -- response --> client
For the workflow process:
Step 1: Read PDF file
Step 2: Extract using OCR tools
Step 3: Extract and annotate model and push into JSON
The Flow Concept:
flowchart LR
GCS(Cloud Storage) -- change --> N(Storage Notification) -- push payload --> PubSub -- push --> Eventarc -- push --> CTN(Cloud Task Navigator) -- push --> CTQ(Cloud Task Queue) --> Workflow
flowchart LR
GCS(Cloud Storage) -- change --> Notification -- push payload --> PubSub -- push --> Eventarc -- Trigger --> Workflow
# <Workflow> <Task> <Workflow> <Cloud Run>
# ++++++++++++++ ++++++++++++++++++ ++++++++++++++ ++++++++++++++
# + Executor + --> + Task Declare + --> + Workflow + ----> + Otter +
# ++++++++++++++ ++++++++++++++++++ ++++++++++++++ ++++++++++++++
Component¶
flowchart TB
basement
subgraph one
a1-->a2
end
subgraph two
b1-->b2
end
subgraph three
Link-->pdf-->partitionby-->bucket
end
SAD - System architechture design¶
-
Data exchange about News
-
Filter: based on regex
-
HOSE: Số liệu quản lý sở hữu của nhà đầu tư nước ngoài 20/07/2023 (20/07/2023 8:17:00 SA) → HOSE/EXCHANGE/FOREIGN_OWNERSHIP
-
Upload into GCS bucket: with prefix
/OPS
... <...> -
Trigger eventac filter nofication change on bucket → Pubsub
+ -
Extract metadadata data
JSON outout
(Dataframe output) -
Storage:
-
Submarine: API_ENDPOINT [1]
-
Ackknowledge to Pubsub <200>
Based on event:
- Exists on new component
Scheduled one
The filer actions is various:
- Based on regex of text news every 5h
The data way is something hole right away
Case 1: Normal case
The HSX has published news in some extensions
Steps: Sync File to Targetr¶
Download multiple sections
Steps: Trigger¶
Syntax: state.a.platform.key.timeframe
Physical component¶
Component of [Pipeline]
flowchart TB
gcs(Cloud Storage) -- changes --> ps(PubSub) -- listen --> e(Eventarc) -- push --> wfct(Workflow Task Declare) -- make task --> tq(Task Queue) -- push --> wfe(Workflow Executor)
SAD¶
flowchart LR
P(PDF) -- Push to GCS --> OPS(State of file: OPS) --> CR(Cloud Run)
CR --> X[ Execute ]
X -->|1. parse pdf: Error \n 2. push to Submarine: Error| F(Change state file: FAILED)
X -->|1. parse pdf: Success \n 2. push to Submarine: Error| E(EXIT & RAISE)
X -->|1. parse pdf: Success \n 2. push to Submarine: Success| FI(Change state file: FINISHED)
- Deployment: Staging + Production
Staging | Submarine | Raptor
Production | Submarine | Raptor
-
[Submarine] + [Raptor]
-
[Docker] + [Docker]
gcloud tasks queues update \(TASK_QUEUE_NAME \ --location=\)PROJECT_REGION \ --max-concurrent-dispatches=2 \ --max-dispatches-per-second=0.03334 \ --max-attempts=2 \ --min-backoff='10s' \ --max-backoff='120s' \ --max-doublings=3 \ --max-retry-duration='120s';
- Update the
args: [ 'run', 'deploy', '\(_APPLICATION_ID', '--description', '\)_APPLICATION_DESCRIPTION', '--platform', 'managed', '--no-allow-unauthenticated', '--cpu-throttling', '--concurrency', '1', '--region', '\(LOCATION', '--image', '\)LOCATION-docker.pkg.dev/\(PROJECT_ID/\)_ARTIFACT_REGISTRY_REPOSITORY_NAME/\(_IMAGE_NAME:latest', '--port', '8000', '--service-account', '\)_SERVICE_ACCOUNT_NAME@\(PROJECT_ID.iam.gserviceaccount.com', '--cpu', '1', '--memory', '1Gi', '--ingress', 'all', '--min-instances', '1', '--max-instances', '2', '--set-secrets=PROJECT_ID=DATA_PROJECT_ID:latest,DEPLOYMENT_ENVIRONMENT_STATE=RAPTOR_PROD_DEPLOYMENT_ENVIRONMENT_STATE:latest,PROD_API_ENDPOINT=RAPTOR_PROD_API_ENDPOINT:latest', '--labels=team=data,pipeline=\)_PIPELINE' ]
- [] Test on staging
staging:... staging::.. + googlecloud
- Test on production
Based on the old concepts, the workflows will be trigger at the rate is hard, it affected the last Cloud Run would just.
What changes:
- The exists of Cloud Tasks
But in the normal case
The Listener is now a
Note
It's a multiple progress
It's required trigger is up to date
The resources is can peaked by sometime at the endpint
This pipeline Raptor serves to extract data from a pdf file into the Basement database
This workflow receive messages from pubsub via GCS notification and eventars, then execute steps.
-
Cloud Run: With input is messages which are received from workflow. It gets objectId, parse it to a dataframe, and pushes data to Submarine's respective endpoints
-
Change state file: This step moves the file to a folder with the state corresponding to the Cloud Run result received. Specifically as follows, if the received result is
Success
, the file will be changed to the FINISHED state, otherwise, if the received result isFailed
, the file will be converted to the state FAILED.
Steps: Filter¶
Developers¶
Deployment | Description | Product Requirement |
---|---|---|
DP-01 | Clone API system | a) Exclude Authetication b) The same parameters |
DP-02 | Clone Models | a) Models in dbt |
DP-03 | Merge and deploy Clone API to AWS server | |
DP-04 | Test on 1 partner | |
DP-05 | Test/Migrate all the rest partners. |
Service Account¶
SA performs deploy and execute workflow¶
SA name: sa-warlock@$PROJECT_ID.iam.gserviceaccount.com
Permistion for SA:
Cloud Run:
- roles/run.developer
- role/run.invoker
Secret Manager Secret Accessor:
- roles/secretmanager.secretAccessor
Monitoring Metric Writer:
- roles/monitoring.metricWriter
Workflows:
- roles/workflows.invoker
- roles/workflows.viewer
GCS: bucket: - inno-processor - inno-processor-staging
- roles/storage.objectAdmin
- roles/storage.legacyBucketReader
SA performs deploy and execute Cloud Run¶
SA name: sa-snapfire@$PROJECT_ID.iam.gserviceaccount.com
Permistion for SA:
Cloud Run:
- roles/run.developer
Cloud SQL:
- roles/cloudsql.client
- roles/cloudsql.instanceUser
Secret Manager Secret Accessor:
- roles/secretmanager.secretAccessor
Monitoring Metric Writer:
- roles/monitoring.metricWriter
Workflows:
- roles/workflows.invoker
- roles/workflows.viewer
GCS:
- roles/storage.admin (Will revise it in the near future)
Create Queue
Data Concept¶
Data concept in the Raptor
{
"attributes": {
"bucketId": "inno-processor-staging",
"eventTime": "2023-03-10T16:42:45.586544Z",
"eventType": "OBJECT_FINALIZE",
"notificationConfig": "projects/_/buckets/inno-processor-staging/notificationConfigs/15",
"objectGeneration": "1678466565552993",
"objectId": "OPS/EXCHANGE/HNX/INDEX_FOREIGN_TRADING/20230215/20230215_ID_Foreigners_trading_by_index.pdf",
"overwroteGeneration": "1678465978303900",
"payloadFormat": "JSON_API_V1"
},
"data": "ewogICJraW5kIjogInN0b3JhZ2Ujb2JqZWN0IiwKICAiaWQiOiAiaW5uby1wcm9jZXNzb3Itc3RhZ2luZy9PUFMvRVhDSEFOR0UvSE5YL0lOREVYX0ZPUkVJR05fVFJBRElORy8yMDIzMDIxNS8yMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmLzE2Nzg0NjY1NjU1NTI5OTMiLAogICJzZWxmTGluayI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9zdG9yYWdlL3YxL2IvaW5uby1wcm9jZXNzb3Itc3RhZ2luZy9vL09QUyUyRkVYQ0hBTkdFJTJGSE5YJTJGSU5ERVhfRk9SRUlHTl9UUkFESU5HJTJGMjAyMzAyMTUlMkYyMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmIiwKICAibmFtZSI6ICJPUFMvRVhDSEFOR0UvSE5YL0lOREVYX0ZPUkVJR05fVFJBRElORy8yMDIzMDIxNS8yMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmIiwKICAiYnVja2V0IjogImlubm8tcHJvY2Vzc29yLXN0YWdpbmciLAogICJnZW5lcmF0aW9uIjogIjE2Nzg0NjY1NjU1NTI5OTMiLAogICJtZXRhZ2VuZXJhdGlvbiI6ICIxIiwKICAiY29udGVudFR5cGUiOiAiYXBwbGljYXRpb24vcGRmIiwKICAidGltZUNyZWF0ZWQiOiAiMjAyMy0wMy0xMFQxNjo0Mjo0NS41ODZaIiwKICAidXBkYXRlZCI6ICIyMDIzLTAzLTEwVDE2OjQyOjQ1LjU4NloiLAogICJzdG9yYWdlQ2xhc3MiOiAiU1RBTkRBUkQiLAogICJ0aW1lU3RvcmFnZUNsYXNzVXBkYXRlZCI6ICIyMDIzLTAzLTEwVDE2OjQyOjQ1LjU4NloiLAogICJzaXplIjogIjE4MzE4IiwKICAibWQ1SGFzaCI6ICJheDFuOVZNcWtjVkQybTRCUnI2MTZBPT0iLAogICJtZWRpYUxpbmsiOiAiaHR0cHM6Ly9zdG9yYWdlLmdvb2dsZWFwaXMuY29tL2Rvd25sb2FkL3N0b3JhZ2UvdjEvYi9pbm5vLXByb2Nlc3Nvci1zdGFnaW5nL28vT1BTJTJGRVhDSEFOR0UlMkZITlglMkZJTkRFWF9GT1JFSUdOX1RSQURJTkclMkYyMDIzMDIxNSUyRjIwMjMwMjE1X0lEX0ZvcmVpZ25lcnNfdHJhZGluZ19ieV9pbmRleC5wZGY/Z2VuZXJhdGlvbj0xNjc4NDY2NTY1NTUyOTkzJmFsdD1tZWRpYSIsCiAgImNvbnRlbnRMYW5ndWFnZSI6ICJlbiIsCiAgImNyYzMyYyI6ICJBb2ErMVE9PSIsCiAgImV0YWciOiAiQ09IMm9Kdm4wZjBDRUFFPSIKfQo=",
"messageId": "7152703534336556",
"publishTime": "2023-03-10T16:42:45.83Z"
}
{
"engine": {
"run_time": {
"end": "2023-03-10T23:42:51.625886+07:00",
"start": "2023-03-10T23:42:47.694272+07:00"
},
"url": "https://staging-processor-listen-tunnel-2yrayayjeq-as.a.run.app"
},
"metadata": {
"endpoint": {
"method": "PUT",
"path": "/hnx/index/foreign_trading/file",
"url": "https://staging-submarine-2yrayayjeq-as.a.run.app"
},
"error": null,
"output": {
"algo": "sha_s256",
"data": "e81e132669fb160cff4b111fe027980fef5af730e85830e786f79d68d0f0969c",
"type": "dataframe"
},
"source": {
"bucket": "inno-processor-staging",
"object_id": "OPS/EXCHANGE/HNX/INDEX_FOREIGN_TRADING/20230215/20230215_ID_Foreigners_trading_by_index.pdf"
},
"status": "SUCCESS",
"step": {
"ANNOTATE_RESULT": {
"is_success": true,
"name": "annotate-metadata-with-result",
"position": 2,
"required": false
},
"DOWNLOAD_SOURCE_INPUT": {
"is_success": true,
"name": "download-input-from-gcs",
"position": 0,
"required": true
},
"EXTRACT_DATA_ELEMENT": {
"is_success": true,
"name": "extract-data-element-using-processor",
"position": 1,
"required": true
},
"OUTPUT_LOCK": {
"is_success": true,
"name": "lock-output",
"position": 3,
"required": false
},
"PUSH_ENDPOINT": {
"is_success": true,
"name": "push-result-to-endpoint",
"position": 4,
"required": true
}
}
}
}
For the Run
Max con-currency 1
Limit:
Use 2 instance with min 1 CPU on 1Gi RAM
Queue Detail
Tech Stack¶
-
Serverless handle requests from various activators: Cloud Run
-
CICD for deployment architechture component: Cloud Build
-
Oschestration: Cloud Workflows, PubSub, GCS Notification, Trigger
-
Storage Engine: RDMS (MySQL, Postgres), Columnar (BigQuery), Cloud Storage (GCS)
-
Rate Limit: Cloud Task
Related Component¶
Pricing¶
Troubleshooting¶
Memory
Term¶
max_concurrent_requests, which is equivalent to max_concurrent_dispatches rate, which is equivalent to max_dispatches_per_second bucket_size, which is equivalent to max_burst_size
Clean Resources¶
Source Reference¶
[2] https://cloud.google.com/run/docs/about-concurrency
https://cloud.google.com/tasks/docs/configuring-queues
# Component of [Pipeline]
#
# <Cloud Storage> <Workflow> <Task> <Workflow>
# ++++++++++++++ <Pubsub> ++++++++++++++++++ +++++++++++ ++++++++++++++
# + Storage + -- Eventarc --> + Task Declare + --> + Queue + ----> + Executor +
# ++++++++++++++ ++++++++++++++++++ +++++++++++ ++++++++++++++
# Variable
declare PIPELINE=baryonyx
declare PIPELINE_FOLDER=baryonyx
# Repository Name
declare REPOSITORY_NAME=Innotech-Vietnam/inno-processor
declare REPOSITORY_OWNER=Inno-Enchantress
# Bucket Default Input
declare BUCKET_PREFIX=FINANCIAL_STATEMENT/OPS/
# Default SA
declare WORKFLOW_TASK_DECLARE_SERVICE_ACCOUNT_NAME=sa-chen
declare WORKFLOW_EXECUTOR_SERVICE_ACCOUNT_NAME=sa-warlock
# Validate
check_non_empty_variable PIPELINE
check_non_empty_variable BUCKET_PREFIX
check_non_empty_variable WORKFLOW_TASK_DECLARE_SERVICE_ACCOUNT_NAME
check_non_empty_variable WORKFLOW_EXECUTOR_SERVICE_ACCOUNT_NAME