https://prefect.io logo
m

Mitchell Bregman

08/17/2020, 7:23 PM
Hi all, I am looking to create a dynamic task DAG, which essentially calls a database for all task definitions and dynamically generates tasks. Here is a code snippet:
Copy code
class DS242(ParkMobileFlow):

    @property
    def tasks(self):
        """Loads all tasks for data quality tests."""

        # Connect to ParkMobile client
        pm = ParkMobile()

        # Get Snowflake connection
        sf = pm.get("snowflake")
        sf.connect("sqlalchemy")

        # Call all tasks and task definitions
        records = sf.execute("""
            SELECT *
            FROM DEV_DB.DATA_QUALITY_TESTS.QUALITY_TASKS
            WHERE is_active = true;
        """)

        return records

    @property
    def flow(self):
        """Returns the Prefect Flow object."""
        return self.build()

    def build(self):
        """Builds the workflow."""
        tasks = []
        for task in self.tasks:
            tasks.append(
                DataQualityTask(
                    uuid=task.task_uuid,
                    name=task.task_name,
                    description=task.task_description,
                    source_query=task.source_query,
                    target_query=task.target_query,
                )
            )

        with Flow("DS242__data-quality-tests") as flow:
            for task in tasks:
                task()

        return flow
Now, I am trying to
register
this flow to the backend server, and am getting a serialization error, which is understandable. My question is, can I create a first
Task
, that queries the DB for all task definitions, and then dynamically creates new tasks to the
Flow
which performs all of the tasks? I can certainly build a custom Dockerfile that handles deployment of this, but would be awesome to connect to
prefect server
k

Kyle Moon-Wright

08/17/2020, 7:39 PM
Hey @Mitchell Bregman, Yes, this is definitely a viable way to implement your flow, though I might suggest using mapping over each of the records you pull from Snowflake.
m

Mitchell Bregman

08/17/2020, 7:40 PM
ahh, so TaskA being pull from Snowflake, taskB being a map over all the task definitions?
✔️ 1
Makes total sense… thank you for the pointer @Kyle Moon-Wright
👍 1