Timo
06/16/2021, 7:21 AMKevin Kho
Timo
06/16/2021, 2:17 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import State, Success
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
@task(state_handlers=[stateh])
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
Kevin Kho
ENDRUN
instead of fail when para == 2
Timo
06/16/2021, 2:21 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import Failed, State
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
# @task(state_handlers=[stateh])
@task
def forerror(para):
if para == 2:
# raise FAIL("it's 2")
state = Failed("it's 2")
raise ENDRUN(state)
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
Timo
06/16/2021, 2:22 PM[2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
[2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
Hello John
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
1
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
3
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
Kevin Kho
Kevin Kho
Timo
06/17/2021, 5:39 AMTimo
06/17/2021, 10:43 AMls = get_list()
instead of ls = [1,2,3]
). I receive a Task is not iterable
error if I use the output of the get_list() task
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
with Flow("hello_flow") as flow:
# ls = get_list()
ls = [1, 2, 3]
h_tasks = [say_hello("John") for x in ls]
e_tasks = [forerror(para=x) for x in ls]
for i in range(0, len(ls)):
e_tasks[i].set_upstream(h_tasks[i])
if i > 0:
e_tasks[i].set_upstream(e_tasks[i - 1])
if __name__ == "__main__":
flow.run()
Kevin Kho
Kevin Kho
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
@task
def helper(ls):
for x in ls:
say_hello.run("John")
y = forerror.run(para=x)
return y
with Flow("hello_flow") as flow:
ls = get_list()
helper(ls)
flow.run()
Kevin Kho
Timo
06/18/2021, 5:56 AMrun()
(which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said.
Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal).
A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days ([[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
. Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got ValueError: Could not infer an active Flow context.
.
As I discovered I could use StartFlowRun
to start another flow which implements the mapping routine. But this could be not tested locally as StartFlowRun
only works with cloud or server.