Mitchell Bregman
08/17/2020, 7:23 PMclass DS242(ParkMobileFlow):
@property
def tasks(self):
"""Loads all tasks for data quality tests."""
# Connect to ParkMobile client
pm = ParkMobile()
# Get Snowflake connection
sf = pm.get("snowflake")
sf.connect("sqlalchemy")
# Call all tasks and task definitions
records = sf.execute("""
SELECT *
FROM DEV_DB.DATA_QUALITY_TESTS.QUALITY_TASKS
WHERE is_active = true;
""")
return records
@property
def flow(self):
"""Returns the Prefect Flow object."""
return self.build()
def build(self):
"""Builds the workflow."""
tasks = []
for task in self.tasks:
tasks.append(
DataQualityTask(
uuid=task.task_uuid,
name=task.task_name,
description=task.task_description,
source_query=task.source_query,
target_query=task.target_query,
)
)
with Flow("DS242__data-quality-tests") as flow:
for task in tasks:
task()
return flow
Now, I am trying to register
this flow to the backend server, and am getting a serialization error, which is understandable. My question is, can I create a first Task
, that queries the DB for all task definitions, and then dynamically creates new tasks to the Flow
which performs all of the tasks? I can certainly build a custom Dockerfile that handles deployment of this, but would be awesome to connect to prefect server
Kyle Moon-Wright
08/17/2020, 7:39 PMMitchell Bregman
08/17/2020, 7:40 PM