Andrii Ieroshenko
08/09/2024, 10:16 PMNate
08/10/2024, 1:13 AMAndrii Ieroshenko
08/10/2024, 1:23 AMNate
08/10/2024, 4:15 PMfrom prefect import flow
from prefect.flows import Flow
@flow
def foo():
pass
assert isinstance(foo, Flow) # foo is the flow object
• a flow / task run is what you get when you call a flow or task (i.e. flow/task + parameters)
from prefect import flow
from prefect.context import get_run_context
from prefect.client.schemas.objects import FlowRun
@flow(log_prints=True)
def foo():
print(run := get_run_context().flow_run)
assert isinstance(run, FlowRun)
foo() # i've implicitly created a flow run by calling this flow object
can you give an example of what you want to do with pseudocode?
> Is it possible to create a task without running it when using @task decorator?Andrii Ieroshenko
08/12/2024, 5:04 PMfrom prefect import flow, task
# Sample data structure for tasks with dependencies
tasks_data = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task1", "task2"]},
]
@task
def perform_task(task_id):
print(f"Executing {task_id}")
def task_flow():
task_objects = {}
# Create Prefect tasks without executing them using imaginary create method,
# save Prefect tassks into task_objects to later add dependencies
for task_info in tasks_data:
task_objects[task_info["id"]] = perform_task.create(task_id=task_info["id"])
# Submit tasks for execution ensuring correct execution order via wait_for
for task_info in tasks_data:
if task_info.dependsOn:
task_objects[task_info["id"]].submit(
wait_for=[
task_objects[dependsOnId] for dependsOnId in task_info.dependsOn
]
)
else:
task_objects[task_info["id"]].submit()
# Run the flow
task_flow()
Andrii Ieroshenko
08/12/2024, 5:11 PMdependsOn
. They all can / should be executed as a task within the flow in that order. How can I convert this array into a flow of task that would be executed in the desired order?Nate
08/12/2024, 5:15 PMfrom typing import Dict, List
import networkx as nx
from prefect import flow, task
from prefect.futures import as_completed
tasks_data = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task2"]},
]
@task(task_run_name="{task_id}")
def perform_task(task_id: str):
print(f"Executing {task_id}")
return f"Completed {task_id}"
def construct_task_graph(tasks_data: List[Dict]) -> nx.DiGraph:
"""
Constructs a DAG from our list of tasks and their dependencies
"""
graph = nx.DiGraph()
for _task in tasks_data:
graph.add_node(_task["id"])
for dependency in _task["dependsOn"]:
graph.add_edge(dependency, _task["id"])
return graph
@flow
def dynamic_task_flow(tasks_data: List[Dict]):
graph = construct_task_graph(tasks_data)
task_futures = {}
for task_id in nx.topological_sort(graph):
dependencies = list(graph.predecessors(task_id))
dependency_futures = [
task_futures[dep] for dep in dependencies if dep in task_futures
]
task_future = perform_task.submit(task_id, wait_for=dependency_futures)
task_futures[task_id] = task_future
# Wait for all tasks to complete and print results
for future in as_completed(list(task_futures.values())):
print(future.result())
if __name__ == "__main__":
dynamic_task_flow(tasks_data)
which is similar to this recipeAndrii Ieroshenko
08/12/2024, 5:39 PMtask_flow
to define tasks and dependencies between them?Nate
08/12/2024, 5:43 PMtasks_data = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task2"]},
]
are implied by the control flow of your python code, rather than pulled out like foo >> bar
etc
though most of the code I have above feels relatively easy to pack up into a small reusable util
> What about using old declarative / inline syntax in Prefect 3 inside task_flow
to define tasks and dependencies between them?
can you expand a bit on what you mean by this?Andrii Ieroshenko
08/12/2024, 5:58 PMtask_objects[task_info["id"]] = task(perform_task(task_info["id"]))
from prefect import flow, task
# Sample data structure for tasks with dependencies
tasks_data = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task1", "task2"]},
]
@task
def perform_task(task_id):
print(f"Executing {task_id}")
def task_flow():
task_objects = {}
# Create Prefect tasks without executing them using inline task syntax,
# save Prefect tassks into task_objects to later add dependencies
for task_info in tasks_data:
task_objects[task_info["id"]] = task(perform_task(task_info["id"]))
# Submit tasks for execution ensuring correct execution order via wait_for
for task_info in tasks_data:
if task_info.dependsOn:
task_objects[task_info["id"]].submit(
wait_for=[
task_objects[dependsOnId] for dependsOnId in task_info.dependsOn
]
)
else:
task_objects[task_info["id"]].submit()
# Run the flow
task_flow()
Andrii Ieroshenko
08/12/2024, 6:00 PMtask_objects[task_info["id"]] = task(perform_task(task_info["id"]))
would work and would create a new task as kind of an object / template without executing itAndrii Ieroshenko
08/12/2024, 6:18 PMi.e most of the time these relationships are implied by the control flow of your python code, rather than pulled out likeI can definitely see a value of this approach as just adding a decorator for one-off tasks and it “just working” felt really nice. But as my case demonstrates allowing only decorators and not allowing declarative way of defining tasks inline as in code above limits some less straightforward use cases for example involving dynamic flow definitions.etcfoo >> bar
Andrii Ieroshenko
08/12/2024, 6:20 PMtask(perform_task(task_info["id"]))
wouldn’t work for creating a task without executing it or is not recommended?Nate
08/12/2024, 6:21 PMperform_task(task_info["id"])
is invoking the __call__
method on Task
, which will execute your taskAndrii Ieroshenko
08/12/2024, 6:22 PMAndrii Ieroshenko
08/12/2024, 6:22 PMAndrii Ieroshenko
08/12/2024, 6:22 PMNate
08/12/2024, 6:22 PMAndrii Ieroshenko
08/12/2024, 6:23 PMNate
08/12/2024, 6:26 PMAndrii Ieroshenko
09/05/2024, 1:27 AM