Skip to content

Pipeline: Raptor - workflow for exchange listener

Overview

Push workflow to cloud task for schedule.

Pipeline

List of pipelines:

Pipeline Description Status
Raptor Listen exchange report and pull data element from pdf types Deployed
Baryonyx Collect financial statement elements with parquest, polars Deployed

note: Updated at 2023-07-12 by @bao.truong

# Variable
declare ENVIRONMENT_CODE=staging
declare BUCKET_NAME=inno-processor-staging
declare PUBSUB_TOPIC=staging-raptor-shipper
declare WORKFLOW_TASK_DECLARE_NAME=staging-raptor-task-declare
declare WORKFLOW_TASK_DECLARE_DESCRIPTION='[Raptor] Staging - Create worker to declare task to hub'
declare WORKFLOW_TASK_DECLARE_YAML_FILE=workflow/staging_task_dispatch_pipeline_raptor.yaml
declare EVENTARC_TASK_DECLARE_SUBSCRIPTION=staging-raptor-transfer
declare WORKFLOW_EXECUTOR_NAME=staging-raptor-executor
declare WORKFLOW_EXECUTOR_DESCRIPTION="[Raptor] Staging - Execute processor pipeline"
declare WORKFLOW_EXECUTOR_YAML_FILE=workflow/staging_executor_raptor.yaml

System Achitechture Design

For the logic view, there are

flowchart LR

  %% Component
  subgraph gcp[Google Cloud Platform]
    direction TB
    subgraph server[Server]
      direction LR
      subgraph workflow_task
        direction LR
        workflow
        app[ASGI] --> notification
      end
    end
    rate_limiter
  end

  %% Flow
  rate_limiter --> workflow
  client -- request --> rate_limiter
  app -- response --> client

For the workflow process:

Step 1: Read PDF file

Step 2: Extract using OCR tools

Step 3: Extract and annotate model and push into JSON

The Flow Concept:

flowchart LR
    GCS(Cloud Storage) -- change --> N(Storage Notification) -- push payload --> PubSub -- push --> Eventarc -- push --> CTN(Cloud Task Navigator) -- push --> CTQ(Cloud Task Queue) --> Workflow
flowchart LR
    GCS(Cloud Storage) -- change --> Notification -- push payload --> PubSub -- push --> Eventarc -- Trigger --> Workflow
# <Workflow> <Task> <Workflow> <Cloud Run>

# ++++++++++++++ ++++++++++++++++++ ++++++++++++++ ++++++++++++++

# + Executor + --> + Task Declare + --> + Workflow + ----> + Otter +

# ++++++++++++++ ++++++++++++++++++ ++++++++++++++ ++++++++++++++

Component

flowchart TB

  basement
    subgraph one
    a1-->a2
    end

    subgraph two
    b1-->b2
    end

    subgraph three
    Link-->pdf-->partitionby-->bucket
    end

SAD - System architechture design

  1. Data exchange about News

  2. Filter: based on regex

  3. HOSE: Số liệu quản lý sở hữu của nhà đầu tư nước ngoài 20/07/2023 (20/07/2023 8:17:00 SA) → HOSE/EXCHANGE/FOREIGN_OWNERSHIP

  4. Upload into GCS bucket: with prefix /OPS ... <...>

  5. Trigger eventac filter nofication change on bucket → Pubsub +

  6. Extract metadadata data JSON outout (Dataframe output)

  7. Storage:

  8. Submarine: API_ENDPOINT [1]

  9. Ackknowledge to Pubsub <200>

Based on event:

  • Exists on new component

Scheduled one

The filer actions is various:

  • Based on regex of text news every 5h

The data way is something hole right away

Case 1: Normal case

The HSX has published news in some extensions

Steps: Sync File to Targetr

Download multiple sections

Steps: Trigger

Syntax: state.a.platform.key.timeframe

Physical component

Component of [Pipeline]

flowchart TB
    gcs(Cloud Storage) -- changes --> ps(PubSub) -- listen --> e(Eventarc) -- push --> wfct(Workflow Task Declare) -- make task --> tq(Task Queue) -- push --> wfe(Workflow Executor)

SAD

flowchart LR
  P(PDF) -- Push to GCS --> OPS(State of file: OPS) --> CR(Cloud Run)
  CR --> X[ Execute ]
  X -->|1. parse pdf: Error \n 2. push to Submarine: Error| F(Change state file: FAILED)
  X -->|1. parse pdf: Success \n 2. push to Submarine: Error| E(EXIT & RAISE)
  X -->|1. parse pdf: Success \n 2. push to Submarine: Success| FI(Change state file: FINISHED)
  1. Deployment: Staging + Production

Staging | Submarine | Raptor

Production | Submarine | Raptor

  • [Submarine] + [Raptor]

  • [Docker] + [Docker]

gcloud tasks queues update \(TASK_QUEUE_NAME \ --location=\)PROJECT_REGION \ --max-concurrent-dispatches=2 \ --max-dispatches-per-second=0.03334 \ --max-attempts=2 \ --min-backoff='10s' \ --max-backoff='120s' \ --max-doublings=3 \ --max-retry-duration='120s';

  • Update the

args: [ 'run', 'deploy', '\(_APPLICATION_ID', '--description', '\)_APPLICATION_DESCRIPTION', '--platform', 'managed', '--no-allow-unauthenticated', '--cpu-throttling', '--concurrency', '1', '--region', '\(LOCATION', '--image', '\)LOCATION-docker.pkg.dev/\(PROJECT_ID/\)_ARTIFACT_REGISTRY_REPOSITORY_NAME/\(_IMAGE_NAME:latest', '--port', '8000', '--service-account', '\)_SERVICE_ACCOUNT_NAME@\(PROJECT_ID.iam.gserviceaccount.com', '--cpu', '1', '--memory', '1Gi', '--ingress', 'all', '--min-instances', '1', '--max-instances', '2', '--set-secrets=PROJECT_ID=DATA_PROJECT_ID:latest,DEPLOYMENT_ENVIRONMENT_STATE=RAPTOR_PROD_DEPLOYMENT_ENVIRONMENT_STATE:latest,PROD_API_ENDPOINT=RAPTOR_PROD_API_ENDPOINT:latest', '--labels=team=data,pipeline=\)_PIPELINE' ]

  • [] Test on staging

staging:... staging::.. + googlecloud

  • Test on production

Based on the old concepts, the workflows will be trigger at the rate is hard, it affected the last Cloud Run would just.

What changes:

  • The exists of Cloud Tasks

But in the normal case

The Listener is now a

Note

It's a multiple progress

It's required trigger is up to date

The resources is can peaked by sometime at the endpint

This pipeline Raptor serves to extract data from a pdf file into the Basement database

This workflow receive messages from pubsub via GCS notification and eventars, then execute steps.

  1. Cloud Run: With input is messages which are received from workflow. It gets objectId, parse it to a dataframe, and pushes data to Submarine's respective endpoints

  2. Change state file: This step moves the file to a folder with the state corresponding to the Cloud Run result received. Specifically as follows, if the received result is Success, the file will be changed to the FINISHED state, otherwise, if the received result is Failed, the file will be converted to the state FAILED.

Steps: Filter

Developers

Deployment Description Product Requirement
DP-01 Clone API system a) Exclude Authetication b) The same parameters
DP-02 Clone Models a) Models in dbt
DP-03 Merge and deploy Clone API to AWS server
DP-04 Test on 1 partner
DP-05 Test/Migrate all the rest partners.

Service Account

SA performs deploy and execute workflow

SA name: sa-warlock@$PROJECT_ID.iam.gserviceaccount.com

Permistion for SA:

Cloud Run:

  • roles/run.developer
  • role/run.invoker

Secret Manager Secret Accessor:

  • roles/secretmanager.secretAccessor

Monitoring Metric Writer:

  • roles/monitoring.metricWriter

Workflows:

  • roles/workflows.invoker
  • roles/workflows.viewer

GCS: bucket: - inno-processor - inno-processor-staging

  • roles/storage.objectAdmin
  • roles/storage.legacyBucketReader

SA performs deploy and execute Cloud Run

SA name: sa-snapfire@$PROJECT_ID.iam.gserviceaccount.com

Permistion for SA:

Cloud Run:

  • roles/run.developer

Cloud SQL:

  • roles/cloudsql.client
  • roles/cloudsql.instanceUser

Secret Manager Secret Accessor:

  • roles/secretmanager.secretAccessor

Monitoring Metric Writer:

  • roles/monitoring.metricWriter

Workflows:

  • roles/workflows.invoker
  • roles/workflows.viewer

GCS:

  • roles/storage.admin (Will revise it in the near future)

Create Queue

Data Concept

Data concept in the Raptor

{
  "attributes": {
    "bucketId": "inno-processor-staging",
    "eventTime": "2023-03-10T16:42:45.586544Z",
    "eventType": "OBJECT_FINALIZE",
    "notificationConfig": "projects/_/buckets/inno-processor-staging/notificationConfigs/15",
    "objectGeneration": "1678466565552993",
    "objectId": "OPS/EXCHANGE/HNX/INDEX_FOREIGN_TRADING/20230215/20230215_ID_Foreigners_trading_by_index.pdf",
    "overwroteGeneration": "1678465978303900",
    "payloadFormat": "JSON_API_V1"
  },
  "data": "ewogICJraW5kIjogInN0b3JhZ2Ujb2JqZWN0IiwKICAiaWQiOiAiaW5uby1wcm9jZXNzb3Itc3RhZ2luZy9PUFMvRVhDSEFOR0UvSE5YL0lOREVYX0ZPUkVJR05fVFJBRElORy8yMDIzMDIxNS8yMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmLzE2Nzg0NjY1NjU1NTI5OTMiLAogICJzZWxmTGluayI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9zdG9yYWdlL3YxL2IvaW5uby1wcm9jZXNzb3Itc3RhZ2luZy9vL09QUyUyRkVYQ0hBTkdFJTJGSE5YJTJGSU5ERVhfRk9SRUlHTl9UUkFESU5HJTJGMjAyMzAyMTUlMkYyMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmIiwKICAibmFtZSI6ICJPUFMvRVhDSEFOR0UvSE5YL0lOREVYX0ZPUkVJR05fVFJBRElORy8yMDIzMDIxNS8yMDIzMDIxNV9JRF9Gb3JlaWduZXJzX3RyYWRpbmdfYnlfaW5kZXgucGRmIiwKICAiYnVja2V0IjogImlubm8tcHJvY2Vzc29yLXN0YWdpbmciLAogICJnZW5lcmF0aW9uIjogIjE2Nzg0NjY1NjU1NTI5OTMiLAogICJtZXRhZ2VuZXJhdGlvbiI6ICIxIiwKICAiY29udGVudFR5cGUiOiAiYXBwbGljYXRpb24vcGRmIiwKICAidGltZUNyZWF0ZWQiOiAiMjAyMy0wMy0xMFQxNjo0Mjo0NS41ODZaIiwKICAidXBkYXRlZCI6ICIyMDIzLTAzLTEwVDE2OjQyOjQ1LjU4NloiLAogICJzdG9yYWdlQ2xhc3MiOiAiU1RBTkRBUkQiLAogICJ0aW1lU3RvcmFnZUNsYXNzVXBkYXRlZCI6ICIyMDIzLTAzLTEwVDE2OjQyOjQ1LjU4NloiLAogICJzaXplIjogIjE4MzE4IiwKICAibWQ1SGFzaCI6ICJheDFuOVZNcWtjVkQybTRCUnI2MTZBPT0iLAogICJtZWRpYUxpbmsiOiAiaHR0cHM6Ly9zdG9yYWdlLmdvb2dsZWFwaXMuY29tL2Rvd25sb2FkL3N0b3JhZ2UvdjEvYi9pbm5vLXByb2Nlc3Nvci1zdGFnaW5nL28vT1BTJTJGRVhDSEFOR0UlMkZITlglMkZJTkRFWF9GT1JFSUdOX1RSQURJTkclMkYyMDIzMDIxNSUyRjIwMjMwMjE1X0lEX0ZvcmVpZ25lcnNfdHJhZGluZ19ieV9pbmRleC5wZGY/Z2VuZXJhdGlvbj0xNjc4NDY2NTY1NTUyOTkzJmFsdD1tZWRpYSIsCiAgImNvbnRlbnRMYW5ndWFnZSI6ICJlbiIsCiAgImNyYzMyYyI6ICJBb2ErMVE9PSIsCiAgImV0YWciOiAiQ09IMm9Kdm4wZjBDRUFFPSIKfQo=",
  "messageId": "7152703534336556",
  "publishTime": "2023-03-10T16:42:45.83Z"
}
{
  "engine": {
    "run_time": {
      "end": "2023-03-10T23:42:51.625886+07:00",
      "start": "2023-03-10T23:42:47.694272+07:00"
    },
    "url": "https://staging-processor-listen-tunnel-2yrayayjeq-as.a.run.app"
  },
  "metadata": {
    "endpoint": {
      "method": "PUT",
      "path": "/hnx/index/foreign_trading/file",
      "url": "https://staging-submarine-2yrayayjeq-as.a.run.app"
    },
    "error": null,
    "output": {
      "algo": "sha_s256",
      "data": "e81e132669fb160cff4b111fe027980fef5af730e85830e786f79d68d0f0969c",
      "type": "dataframe"
    },
    "source": {
      "bucket": "inno-processor-staging",
      "object_id": "OPS/EXCHANGE/HNX/INDEX_FOREIGN_TRADING/20230215/20230215_ID_Foreigners_trading_by_index.pdf"
    },
    "status": "SUCCESS",
    "step": {
      "ANNOTATE_RESULT": {
        "is_success": true,
        "name": "annotate-metadata-with-result",
        "position": 2,
        "required": false
      },
      "DOWNLOAD_SOURCE_INPUT": {
        "is_success": true,
        "name": "download-input-from-gcs",
        "position": 0,
        "required": true
      },
      "EXTRACT_DATA_ELEMENT": {
        "is_success": true,
        "name": "extract-data-element-using-processor",
        "position": 1,
        "required": true
      },
      "OUTPUT_LOCK": {
        "is_success": true,
        "name": "lock-output",
        "position": 3,
        "required": false
      },
      "PUSH_ENDPOINT": {
        "is_success": true,
        "name": "push-result-to-endpoint",
        "position": 4,
        "required": true
      }
    }
  }
}

For the Run

Max con-currency 1

Limit:

Use 2 instance with min 1 CPU on 1Gi RAM

Queue Detail

Tech Stack

  • Serverless handle requests from various activators: Cloud Run

  • CICD for deployment architechture component: Cloud Build

  • Oschestration: Cloud Workflows, PubSub, GCS Notification, Trigger

  • Storage Engine: RDMS (MySQL, Postgres), Columnar (BigQuery), Cloud Storage (GCS)

  • Rate Limit: Cloud Task

Pricing

Troubleshooting

Memory

Term

max_concurrent_requests, which is equivalent to max_concurrent_dispatches rate, which is equivalent to max_dispatches_per_second bucket_size, which is equivalent to max_burst_size

Clean Resources

Source Reference

[2] https://cloud.google.com/run/docs/about-concurrency

https://cloud.google.com/tasks/docs/configuring-queues

# Component of [Pipeline]
#
# <Cloud Storage>                     <Workflow>             <Task>              <Workflow>
# ++++++++++++++     <Pubsub>     ++++++++++++++++++       +++++++++++         ++++++++++++++
# +  Storage   +  -- Eventarc --> +  Task Declare  +  -->  +  Queue  +  ---->  +  Executor  +
# ++++++++++++++                  ++++++++++++++++++       +++++++++++         ++++++++++++++


# Variable
declare PIPELINE=baryonyx
declare PIPELINE_FOLDER=baryonyx

# Repository Name
declare REPOSITORY_NAME=Innotech-Vietnam/inno-processor
declare REPOSITORY_OWNER=Inno-Enchantress

# Bucket Default Input
declare BUCKET_PREFIX=FINANCIAL_STATEMENT/OPS/

# Default SA
declare WORKFLOW_TASK_DECLARE_SERVICE_ACCOUNT_NAME=sa-chen
declare WORKFLOW_EXECUTOR_SERVICE_ACCOUNT_NAME=sa-warlock

# Validate
check_non_empty_variable PIPELINE
check_non_empty_variable BUCKET_PREFIX
check_non_empty_variable WORKFLOW_TASK_DECLARE_SERVICE_ACCOUNT_NAME
check_non_empty_variable WORKFLOW_EXECUTOR_SERVICE_ACCOUNT_NAME