Taylor Brown
12/02/2024, 7:15 PM.visualize
on my flow is smart enough detect primitive dependencies even if they are primitives. )
Example:
@task(log_prints=True, viz_return_value=3)
def get_random_number() -> int:
return random.randint(0, 100)
@task(log_prints=True, viz_return_value={"random_number": 3})
def get_random_number_object() -> dict[str, int]:
return {"random_number": random.randint(0, 100)}
@task(log_prints=True)
def print_random_number(random_number: int | dict[str, int]) -> None:
if isinstance(random_number, dict):
random_number = random_number["random_number"]
print(f"The random number is: {random_number}")
@flow(log_prints=True)
def random_number_flow():
print("Invoking directly (dependency not detected):")
print_random_number(get_random_number())
print("Calling object-returning task (dependency auto-detected)")
print_random_number(get_random_number_object())
print("Invoking via submit (dependency auto-detected):")
random_number_result = get_random_number.submit()
print_random_number.submit(random_number_result).wait()
print("Invoking via submit.result() (dependency not detected):")
random_number_result = get_random_number.submit()
Bianca Hoch
12/02/2024, 11:15 PM@task
def task_a():
return "Hello"
@task
def task_b(input_string):
return f"{input_string} World"
@flow
def my_flow():
a_result = task_a()
b_result = task_b(a_result)
or with .submit() -
@task
def say_hello(name):
return f"Hello {name}!"
@task
def print_result(result):
print(type(result))
print(result)
@flow(name="hello-flow")
def hello_world():
future = say_hello.submit("Marvin")
print_result.submit(future).wait()
It's worth noting that you can also define dependencies using wait_for
, even if there's no data being passed from one task to another. wait_for
requires that the task wait for upstream tasks to finish before execution. ie:
@flow
def my_flow():
a_future = task_a.submit()
b_future = task_b.submit(wait_for=[a_future])
Taylor Brown
12/02/2024, 11:18 PMreturn 5
, Prefect no longer knows that task_b depends on task_a.
(It wasn't documented anywhere, and I used an integer result to test dependency resolution, so I almost thought Prefect couldn't detect dependencies.)Bianca Hoch
12/04/2024, 11:33 PMint
may be too small to track effectively. It has to do with how python uses memory for small data.Bianca Hoch
12/04/2024, 11:34 PMfrom prefect import flow, task
@task
def task_a():
return 500
@task(log_prints=True)
def task_b(my_number):
print(f"The number is: {my_number} ")
@flow
def my_flow():
a_result = task_a()
b_result = task_b(a_result)
if __name__ == "__main__":
my_flow()
Bianca Hoch
12/04/2024, 11:35 PMTaylor Brown
12/04/2024, 11:35 PMTaylor Brown
12/04/2024, 11:36 PM