roady
11/25/2022, 10:24 AM# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger
@task
def add_one(x):
if x==1:
raise Exception("Raised exception")
return x+1
@task
def do_something(dummy):
get_run_logger().info("Doing something")
return
@flow
def mapped_flow_not_dependent():
a = list([0,2,3])
b = add_one.map(a, return_state=True)
c = add_one.map(b, return_state=True)
d = do_something.map(a, return_state=True, wait_for = [c])
print(c)
print(d)
return "Flow completes"
if _name_ == "_main_":
mapped_flow_not_dependent()
One state in c being failed means none of following do_something tasks run, whereas I would like all of the do_something tasks to run apart from ones where c is failed. I can get the desired behaviour by linking the tasks explicitly: changing the argument of do_something from a to c (and removing the wait_for kwarg).Tim-Oliver
11/25/2022, 10:27 AMPekka
11/25/2022, 11:17 AMc
how do_something
should proceed with a
?roady
11/25/2022, 11:22 AMPekka
11/25/2022, 11:25 AMroady
11/25/2022, 11:32 AMd = do_something.map(a, return_state=True, wait_for = [c])
with d = do_something.map(c, return_state=True)
then the desired behaviour is retrieved. But I specifically want to be able to enforce dependence for mapped tasks which would otherwise not be dependent.Khuyen Tran
11/30/2022, 5:15 PMc
as an argument of do_something
.
If you want this to be something that Prefect supports, I encourage you to create a GitHub issue for this on Prefect GitHub pageroady
12/01/2022, 8:35 AMKhuyen Tran
12/01/2022, 3:47 PMtask.map()
, didnโt it? Iโm not sure if I understood your questionroady
12/01/2022, 4:25 PMwait_for
to not run downstream tasks if an upstream one does not reach a Completed state, even if the downstream tasks do not take the upstream state as an argument.
2. For mapped tasks which do take an upstream state as an argument, only the corresponding downstream tasks do not run if a given upstream task does not enter a Completed state.
3. For mapped tasks which use wait_for
, all of the downstream tasks do not run if any one of the upstream tasks enters a failed state.
Here's an example of the three different cases which I hope will help you understand what I mean:
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger
@task
def add_one(x):
if x==1:
raise Exception("Raised exception")
return x+1
@task
def do_something(dummy):
get_run_logger().info("Doing something")
return
@flow
def wait_for_mwe():
# No mapping, using wait_for but not argument
a = 1
b = add_one(a, return_state=True)
c_1 = do_something(a, return_state=True, wait_for = b)
# Mapping, using argument
a = list([1,2,3])
b = add_one.map(a, return_state=True)
c_2 = do_something.map(b, return_state=True)
# Mapping, using wait_for but not argument
a = list([1,2,3])
b = add_one.map(a, return_state=True)
c_3 = do_something.map(a, return_state=True, wait_for = b)
return c_1, c_2, c_3
if __name__ == "__main__":
c_1, c_2, c_3 = wait_for_mwe()
# State is NotReady
print("Expecting NotReady state:")
print(c_1)
# Two completed states and one NotReady state
print("Expecting 1 NotReady and 2 Completed:")
print(c_2)
# All states are NotReady! :(
print("Expecting 1 NotReady and 2 Completed:")
print(c_3)
Anna Geller
12/01/2022, 7:05 PMwait_for = [allow_failure(c)]
example:
from prefect import task, flow, get_run_logger, allow_failure
@task
def ingest_data():
return 42
@task
def transform_data(x: int) -> int:
if True:
raise ValueError("Non-deterministic error has occured.")
else:
return x * 42
@task
def clean_up_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Cleaning up ๐งน")
@flow
def allow_flaky_transformation_to_pass():
data = ingest_data.submit()
result = transform_data.submit(data)
clean_up_task.submit(wait_for=[allow_failure(result)])
if __name__ == "__main__":
allow_flaky_transformation_to_pass()
roady
12/02/2022, 11:07 AMallow_failure
in my mwe, but it means that all of the mapped downstream tasks run, even if there was a failure in a corresponding upstream task. ๐Peyton Runyan
12/02/2022, 1:40 PMKhuyen Tran
12/02/2022, 4:12 PMfrom prefect import flow, task, get_run_logger
@task
def add_one(x):
if x == 2:
raise Exception("Raised exception")
return x + 1
@task
def add_two(x):
if x == 2:
raise Exception("Raised exception")
return x + 2
@task
def do_something(dummy):
get_run_logger().info("Doing something")
return
@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):
b = add_one.map(a)
c = add_two.map(b)
d = [
do_something.submit(item)
for future, item in zip(c, a)
if future.wait().type == "COMPLETED"
]
return "Flow completes"
@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):
b = add_one.map(a)
c = add_two.map(b)
d = [
do_something.submit(item, return_state=True, wait_for=future)
for item, future in zip(a, c)
]
return "Flow completes"
Anna Geller
12/02/2022, 4:16 PMfrom prefect import task, flow
@task
def upstream_task(item):
if item == "c":
raise Exception("this upstream task failed")
return str(item) + "+1"
@task
def downstream_task(item):
return str(item) + "+2"
@flow
def demo():
items = ["a", "b", "c", "d"]
first = upstream_task.map(items)
downstream_task.map(first) # runs only for a, b, and d. c is in NotReady state
if __name__ == "__main__":
demo()
from prefect import flow, task, get_run_logger, allow_failure
@task
def extract():
return [1, 2, 3]
@task
def add_one(x):
if x == 2:
raise Exception("Something is not right")
return x + 1
@task
def add_two(x):
return x + 2
@task
def cleanup_task():
get_run_logger().info("Cleaning up e.g. removing temp Ray cluster")
@flow
def map_with_cleanup_task():
a = extract()
b = add_one.map(a)
c = add_two.map(b)
cleanup_task.submit(wait_for=[allow_failure(c)])
if __name__ == "__main__":
map_with_cleanup_task()
roady
12/05/2022, 10:03 AM