Timo
06/16/2021, 7:21 AMKevin Kho
Timo
06/16/2021, 2:17 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import State, Success
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
@task(state_handlers=[stateh])
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
Kevin Kho
ENDRUN
instead of fail when para == 2
Timo
06/16/2021, 2:21 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import Failed, State
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
# @task(state_handlers=[stateh])
@task
def forerror(para):
if para == 2:
# raise FAIL("it's 2")
state = Failed("it's 2")
raise ENDRUN(state)
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
Timo
06/16/2021, 2:22 PM[2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
[2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
Hello John
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
1
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
3
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
Kevin Kho
Kevin Kho
Timo
06/17/2021, 5:39 AMTimo
06/17/2021, 10:43 AMls = get_list()
instead of ls = [1,2,3]
). I receive a Task is not iterable
error if I use the output of the get_list() task
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
with Flow("hello_flow") as flow:
# ls = get_list()
ls = [1, 2, 3]
h_tasks = [say_hello("John") for x in ls]
e_tasks = [forerror(para=x) for x in ls]
for i in range(0, len(ls)):
e_tasks[i].set_upstream(h_tasks[i])
if i > 0:
e_tasks[i].set_upstream(e_tasks[i - 1])
if __name__ == "__main__":
flow.run()
Kevin Kho
Kevin Kho
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
@task
def helper(ls):
for x in ls:
say_hello.run("John")
y = forerror.run(para=x)
return y
with Flow("hello_flow") as flow:
ls = get_list()
helper(ls)
flow.run()
Kevin Kho
Timo
06/18/2021, 5:56 AMrun()
(which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said.
Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal).
A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days ([[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
. Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got ValueError: Could not infer an active Flow context.
.
As I discovered I could use StartFlowRun
to start another flow which implements the mapping routine. But this could be not tested locally as StartFlowRun
only works with cloud or server.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by