Ok I can see that Flow class is still available in...
# ask-community
a
Ok I can see that Flow class is still available in Prefect 3. Is it still supported / not depreciated? Don’t see anything in the docs
n
hi @Andrii Ieroshenko - the flow class hasn’t changed much between 2 and 3. like prefect 2, we recommend you use it as a decorator on functions. do you have a specific question about it?
a
Hi. Thank you for the answer. I have a function that I’d like to run with certain dynamic parameters in a dynamic certain order. Using task and flow classes inline would allow me to create tasks and flow without running them and define order of execution via wait_for. I have a hard time doing it when using flow or task as decorator. Is it possible to create a task without running it when using @task decorator?
n
so just to go over some terminology we typically use • a flow / task is the callable, decorated function
Copy code
from prefect import flow
from prefect.flows import Flow

@flow
def foo():
  pass

assert isinstance(foo, Flow) # foo is the flow object
• a flow / task run is what you get when you call a flow or task (i.e. flow/task + parameters)
Copy code
from prefect import flow
from prefect.context import get_run_context
from prefect.client.schemas.objects import FlowRun

@flow(log_prints=True)
def foo():
  print(run := get_run_context().flow_run)
  assert isinstance(run, FlowRun)

foo() # i've implicitly created a flow run by calling this flow object
can you give an example of what you want to do with pseudocode? > Is it possible to create a task without running it when using @task decorator?
a
Thank you for looking into this @Nate. I understand typical approach of defining tasks as functions. And runs as calling these functions with parameters. My problem lies in the need to call a function N times in a certain order with certain parameters defined dynamically by users. Below is code that illustrates what I’m trying to do and the problem:
Copy code
from prefect import flow, task

# Sample data structure for tasks with dependencies
tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task1", "task2"]},
]


@task
def perform_task(task_id):
    print(f"Executing {task_id}")


def task_flow():
    task_objects = {}

    # Create Prefect tasks without executing them using imaginary create method,
    # save Prefect tassks into task_objects to later add dependencies
    for task_info in tasks_data:
        task_objects[task_info["id"]] = perform_task.create(task_id=task_info["id"])

    # Submit tasks for execution ensuring correct execution order via wait_for
    for task_info in tasks_data:
        if task_info.dependsOn:
            task_objects[task_info["id"]].submit(
                wait_for=[
                    task_objects[dependsOnId] for dependsOnId in task_info.dependsOn
                ]
            )
        else:
            task_objects[task_info["id"]].submit()


# Run the flow
task_flow()
So I have an array of data classes / objects defined dynamically by users that need to be executed in a certain order defined by each item’s
dependsOn
. They all can / should be executed as a task within the flow in that order. How can I convert this array into a flow of task that would be executed in the desired order?
n
how is something like this?
Copy code
from typing import Dict, List

import networkx as nx

from prefect import flow, task
from prefect.futures import as_completed

tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task2"]},
]


@task(task_run_name="{task_id}")
def perform_task(task_id: str):
    print(f"Executing {task_id}")
    return f"Completed {task_id}"


def construct_task_graph(tasks_data: List[Dict]) -> nx.DiGraph:
    """
    Constructs a DAG from our list of tasks and their dependencies
    """
    graph = nx.DiGraph()
    for _task in tasks_data:
        graph.add_node(_task["id"])
        for dependency in _task["dependsOn"]:
            graph.add_edge(dependency, _task["id"])
    return graph


@flow
def dynamic_task_flow(tasks_data: List[Dict]):
    graph = construct_task_graph(tasks_data)
    task_futures = {}

    for task_id in nx.topological_sort(graph):
        dependencies = list(graph.predecessors(task_id))
        dependency_futures = [
            task_futures[dep] for dep in dependencies if dep in task_futures
        ]

        task_future = perform_task.submit(task_id, wait_for=dependency_futures)
        task_futures[task_id] = task_future

    # Wait for all tasks to complete and print results
    for future in as_completed(list(task_futures.values())):
        print(future.result())


if __name__ == "__main__":
    dynamic_task_flow(tasks_data)
which is similar to this recipe
a
Topological sorting is definitely an option but it a bit defeats the purpose of using Prefect for workflow execution / management vs for example dask.delayed or apache airflow (need for manual topological sorting, task graph is flattened vs input). I definitely like / see value in Prefect and as far as I understand it’s mostly a syntax problem. What about using old declarative / inline syntax in Prefect 3 inside
task_flow
to define tasks and dependencies between them?
n
yeah fair point, the main wrinkle in doing a declarative DAG with prefect > 2.x is that we discover the graph at runtime by design, which opens up a lot of flexibility, but has the side affect of complicating a static DAG definition in some cases. i.e most of the time these relationships
Copy code
tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task2"]},
]
are implied by the control flow of your python code, rather than pulled out like
foo >> bar
etc though most of the code I have above feels relatively easy to pack up into a small reusable util > What about using old declarative / inline syntax in Prefect 3 inside
task_flow
to define tasks and dependencies between them? can you expand a bit on what you mean by this?
a
I was thinking something like code below: new code is
task_objects[task_info["id"]] = task(perform_task(task_info["id"]))
Copy code
from prefect import flow, task

# Sample data structure for tasks with dependencies
tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task1", "task2"]},
]


@task
def perform_task(task_id):
    print(f"Executing {task_id}")


def task_flow():
    task_objects = {}

    # Create Prefect tasks without executing them using inline task syntax,
    # save Prefect tassks into task_objects to later add dependencies
    for task_info in tasks_data:
        task_objects[task_info["id"]] = task(perform_task(task_info["id"]))

    # Submit tasks for execution ensuring correct execution order via wait_for
    for task_info in tasks_data:
        if task_info.dependsOn:
            task_objects[task_info["id"]].submit(
                wait_for=[
                    task_objects[dependsOnId] for dependsOnId in task_info.dependsOn
                ]
            )
        else:
            task_objects[task_info["id"]].submit()


# Run the flow
task_flow()
It assumes that
task_objects[task_info["id"]] = task(perform_task(task_info["id"]))
would work and would create a new task as kind of an object / template without executing it
i.e most of the time these relationships are implied by the control flow of your python code, rather than pulled out like
foo >> bar
etc
I can definitely see a value of this approach as just adding a decorator for one-off tasks and it “just working” felt really nice. But as my case demonstrates allowing only decorators and not allowing declarative way of defining tasks inline as in code above limits some less straightforward use cases for example involving dynamic flow definitions.
👍 1
So am I correct that
task(perform_task(task_info["id"]))
wouldn’t work for creating a task without executing it or is not recommended?
n
yeah if i understand what you're saying then I think you're right about that
perform_task(task_info["id"])
is invoking the
__call__
method on
Task
, which will execute your task
a
Oh I see I see
Ok yeah I just wanted to verify that I’m not missing anything
👍 1
Thank you so much for looking into this
n
no problem! feel free to follow up here if you have anything else
a
Would you say it would make sense to create an issue / feature request on GH for consideration by Prefect team? With understanding that it might not get picked up ever or in the close future
yess 1
n
this is not the first time we've heard an ask like this, since a lot of prefect 1 users were doing things like this back in the day if you're willing, i think this would make sense as an issue even if we don't immediately address it - it would be useful if other folks have similar needs and end up finding / plus 1'ing the issue
👍 1
a
Hi. I figured out how to handle issue mentioned above gracefully (https://gist.github.com/andrii-i/de14651c12d288dd27475313a541b872).