An Vu Trong
10/07/2024, 2:08 PMimport asyncio
from prefect import flow
from prefect.logging import get_logger
from src.db.mongodb_connection import (
mongodb_fintech_conn,
mongodb_staging_conn,
)
from src.utils.http_client import WifeedAPIClient, JsonFetcher
from src.workflows.wifeed_data_tasks import DataTask
class DataPipeline:
def __init__(self):
self.logger = get_logger()
self.conn = mongodb_staging_conn
self.client = WifeedAPIClient(json_file="json/wifeed_urls.json")
self.fetcher = JsonFetcher(self.client)
self.data_task = DataTask(self.client, self.fetcher)
@flow
async def run_staging(self, loaidn: str="", san: str=""):
await self.data_task.fetch_raw_stock_list(self.conn, loaidn, san)
if __name__ == "__main__":
pipeline = DataPipeline()
pipeline.run_staging.deploy(
name="my-deployment",
work_pool_name="docker-wp",
image="my-docker-image:dev",
push=False
)
when I'm run:
prefect deployment run 'run-staging/my-deployment'
it say:
Error creating flow run: Validation failed. Failure reason: 'self' is a required property
The worker is docker type. How should I fix this? thanks youNate
10/07/2024, 2:38 PMrun_staging
expects self
to be passed, so when the run is created, the parameters
do not contain a value for self
we may want to support this at some point, but I would recommend changing your code to make run_staging
a normal function (not a method on a class)
• pass DataPipeline
into run_staging
• or instantiate DataPipeline
within run_staging
An Vu Trong
10/08/2024, 2:35 PMAn Vu Trong
10/08/2024, 2:36 PMNate
10/08/2024, 2:44 PMAn Vu Trong
10/08/2024, 3:52 PM