m

    Mitchell Bregman

    2 years ago
    Hi all, I am looking to create a dynamic task DAG, which essentially calls a database for all task definitions and dynamically generates tasks. Here is a code snippet:
    class DS242(ParkMobileFlow):
    
        @property
        def tasks(self):
            """Loads all tasks for data quality tests."""
    
            # Connect to ParkMobile client
            pm = ParkMobile()
    
            # Get Snowflake connection
            sf = pm.get("snowflake")
            sf.connect("sqlalchemy")
    
            # Call all tasks and task definitions
            records = sf.execute("""
                SELECT *
                FROM DEV_DB.DATA_QUALITY_TESTS.QUALITY_TASKS
                WHERE is_active = true;
            """)
    
            return records
    
        @property
        def flow(self):
            """Returns the Prefect Flow object."""
            return self.build()
    
        def build(self):
            """Builds the workflow."""
            tasks = []
            for task in self.tasks:
                tasks.append(
                    DataQualityTask(
                        uuid=task.task_uuid,
                        name=task.task_name,
                        description=task.task_description,
                        source_query=task.source_query,
                        target_query=task.target_query,
                    )
                )
    
            with Flow("DS242__data-quality-tests") as flow:
                for task in tasks:
                    task()
    
            return flow
    Now, I am trying to
    register
    this flow to the backend server, and am getting a serialization error, which is understandable. My question is, can I create a first
    Task
    , that queries the DB for all task definitions, and then dynamically creates new tasks to the
    Flow
    which performs all of the tasks? I can certainly build a custom Dockerfile that handles deployment of this, but would be awesome to connect to
    prefect server
    Kyle Moon-Wright

    Kyle Moon-Wright

    2 years ago
    Hey @Mitchell Bregman, Yes, this is definitely a viable way to implement your flow, though I might suggest using mapping over each of the records you pull from Snowflake.
    m

    Mitchell Bregman

    2 years ago
    ahh, so TaskA being pull from Snowflake, taskB being a map over all the task definitions?
    Makes total sense… thank you for the pointer @Kyle Moon-Wright