Exchange Listener¶
Overview¶
Exchange Listener is the pipeline layer to listen structured news & report from exchange.
SAD - System architechture design¶
Explain¶
-
[Provider]:
-
HNX: Reference URL
-
[Scheduler]: It will trigger two times/day in 7 A.M and 19 P.M.
Logic Design¶
flowchart LR
A[Scheduler] --> B
B[Provider] --> C{Is new data?}
C -->|Yes| D[Download]
D --> E[Upload to GCS]
C ---->|No| F[End]
-
Fetching new data every day.
-
Download data files and upload to GCS
-
Using Pub/Sub notification about change in GCS to Workflows
-
Schedule timer to running CRONs with full log.
Thống kê của Index
- Create URL link:
https://owa.hnx.vn/ftp///THONGKEGIAODICH//20230808/INDEX/20230808_ID_Thong_ke_thong_tin_chi_so.pdf
- --date
ARG_DATE ?= (default) = last-trading-date
Backfill the previous date
from core
from src.core import (
last_trading_date_vietnam,
next_trading_date_vietnam,
)
LAST_TRADING_DATE: date = last_trading_date_vietnam()
- Download based on temporary folder using
with <...>
import shutil
import tempfile
import urllib.request
with urllib.request.urlopen('http://python.org/') as response:
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
shutil.copyfileobj(response, tmp_file)
...
-
process python [end script] -
clean ... (python)
-
Upload GCS bucket: /OPS/HNX/INDEX/SNAPSHOT
-
Trigger workflow -> Slack
-
Progmaing:
->
HSX: UUID
Database UUID news -> Extract UUID
The processor
flowchart LR
[Identifier (date, UUID)] -- construct --> Downloadable URL -- extract --> Download into Temporary
--> Validate [Header, Extension (filter only PDF).] --> Upload to target GCS
Because this is the I/O operation, so we required to push it
Model¶
If the data has been processed
Table of the model
They