PrabhatGupta
09/15/2023, 11:36 AMfrom prefect import task, flow
@task
def task1():
try:
## do some transformation here
return "1"
except Exception as e:
return "0",e
@task(log_prints=True)
def task2():
try:
## do more transformation here
return "1"
except Exception as e:
return "0",e
@flow
def main():
out1 = task1(return_state=True)
print(out1)
out2 = task2(wait_for=out1)
if __name__ == "__main__":
main()
Marvin
09/15/2023, 11:37 AMpython
from prefect import task, Flow
@task
def task1():
try:
## do some transformation here
return "1"
except Exception as e:
return "0",e
@task
def task2(task1_output):
if task1_output == "1":
try:
## do more transformation here
return "1"
except Exception as e:
return "0",e
else:
return "Task1 did not succeed"
with Flow("main") as flow:
out1 = task1()
out2 = task2(out1)
flow.run()
In this version, task2 takes the output of task1 as an input. If the output of task1 is "1", it proceeds with its transformation. If not, it immediately returns a message saying that task1 did not succeed.
I hope that helps. Or at least, I hope it doesn't make things worse. I'm never quite sure with you humans.PrabhatGupta
09/15/2023, 11:41 AMMarvin
09/15/2023, 11:42 AMpython
from prefect import task, Flow
@task
def task1():
try:
## do some transformation here
return "1"
except Exception as e:
print(f"An error occurred in task1: {e}")
raise
@task
def task2(task1_output):
if task1_output == "1":
try:
## do more transformation here
return "1"
except Exception as e:
print(f"An error occurred in task2: {e}")
raise
else:
return "Task1 did not succeed"
with Flow("main") as flow:
out1 = task1()
out2 = task2(out1)
flow.run()
In this version, if an error occurs in task1 or task2, the error message is printed and the exception is re-raised. Prefect will then mark the task as failed and the flow will stop running.
I hope that helps. Or at least, I hope it doesn't make things worse. I'm never quite sure with you humans.PrabhatGupta
09/15/2023, 11:49 AMMarvin
09/15/2023, 11:49 AMState
objects and Task
methods to create a similar effect. You can use the is_failed
method to check if a task has failed, and then use that information to decide whether to run another task.
Here's a rough example:
python
from prefect import task, Flow
from prefect.engine.state import Failed
@task
def task1():
try:
## do some transformation here
return "1"
except Exception as e:
print(f"An error occurred in task1: {e}")
raise
@task
def task2(task1_state):
if isinstance(task1_state, Failed):
try:
## do more transformation here
return "1"
except Exception as e:
print(f"An error occurred in task2: {e}")
raise
else:
return "Task1 did not fail"
with Flow("main") as flow:
out1 = task1()
out2 = task2(out1)
flow.run()
In this version, task2
takes the state of task1
as an input. If task1
failed, task2
proceeds with its transformation. If not, it immediately returns a message saying that task1
did not fail.
Remember, this is a rough example. You'll need to adapt it to your specific use case. But I hope it gives you a starting point. Or at least, I hope it doesn't make things worse. I'm never quite sure with you humans.