Narasimhan Ramaswamy
10/25/2020, 10:04 PMfrom prefect import task, case, apply_map,Flow,unmapped
import prefect
from prefect.tasks.control_flow import merge
from prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from prefect.tasks.control_flow import case
from prefect.environments.storage import Docker
import docker
def test2(name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Inside Task 2")
a = ShellTask(name=name, command="ls", return_all=True, log_stderr=True)
return a
@task(log_stdout=True)
def inc_if_even(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("----Start----")
name = r'Shell' + str(x)
b = test2(name).run()
<http://logger.info|logger.info>(f"{b}")
<http://logger.info|logger.info>("----End----")
return 1
@task
def reduce(x):
z = sum(x)
if z > 1:
return True
else:
return False
@task
def return_num(x):
return x + 10
@task
def logger_st(status):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Task Failed - {status}")
return 1
from prefect.engine.executors import DaskExecutor,LocalDaskExecutor
from prefect.environments import LocalEnvironment
with Flow("test-flow",environment=LocalEnvironment(executor=DaskExecutor())) as flow:
logger = prefect.context.get("logger")
return_nums = return_num.map([1,2,3,4,5,6,7])
result = inc_if_even.map(return_nums)
reduced = reduce(result)
Hi All, just a quick question on DaskExecutor() - when running using flow.run, the mapped tasks works in parallel. but when running on cloud, it runs in sequence. My setup is agent running on Kubernetes. can you please help on what i should be doing?Jimmy Le
11/08/2020, 4:57 PMwith Flow("flow name", schedule=schedule) as f:
# mapped flow
# set flow label
f.environment = LocalEnvironment(labels=["flow-label"], executor=LocalDaskExecutor())
# set flow storage
f.storage = Local(
path="path/to/flow.py",
stored_as_script=True,
)
Narasimhan Ramaswamy
11/16/2020, 11:24 AM