https://prefect.io logo
n

Narasimhan Ramaswamy

10/25/2020, 10:04 PM
Copy code
from prefect import task, case, apply_map,Flow,unmapped
import prefect
from prefect.tasks.control_flow import merge
from prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from prefect.tasks.control_flow import case
from prefect.environments.storage import Docker
import docker 

def test2(name):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Inside Task 2")
    a = ShellTask(name=name, command="ls", return_all=True, log_stderr=True)
    return a

@task(log_stdout=True)
def inc_if_even(x):
    logger = prefect.context.get("logger")

    <http://logger.info|logger.info>("----Start----")
    name = r'Shell' + str(x)
    b = test2(name).run()
    <http://logger.info|logger.info>(f"{b}")
    <http://logger.info|logger.info>("----End----")
    return 1

@task
def reduce(x):
    z = sum(x)
    if z > 1:
        return True
    else:
        return False

@task
def return_num(x):
    return x + 10

@task
def logger_st(status):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Task Failed - {status}")
    return 1

from prefect.engine.executors import DaskExecutor,LocalDaskExecutor
from prefect.environments import LocalEnvironment

with Flow("test-flow",environment=LocalEnvironment(executor=DaskExecutor())) as flow:
    logger = prefect.context.get("logger")
    return_nums = return_num.map([1,2,3,4,5,6,7])
    result = inc_if_even.map(return_nums)
    reduced = reduce(result)
Hi All, just a quick question on DaskExecutor() - when running using flow.run, the mapped tasks works in parallel. but when running on cloud, it runs in sequence. My setup is agent running on Kubernetes. can you please help on what i should be doing?
👀 2
j

Jimmy Le

11/08/2020, 4:57 PM
@Narasimhan Ramaswamy I ran into a similar problem. I was able to run the flows in parallel locally, but when running on the cloud, it was running in serial. By setting the executor in my environment, I was able to achieve parallel flow running on the cloud. Now my computer is a space heater 🔥
Copy code
with Flow("flow name", schedule=schedule) as f:
    # mapped flow

# set flow label
f.environment = LocalEnvironment(labels=["flow-label"], executor=LocalDaskExecutor())

# set flow storage
f.storage = Local(
    path="path/to/flow.py",
    stored_as_script=True,
)
n

Narasimhan Ramaswamy

11/16/2020, 11:24 AM
thanks @Jimmy Le - i was able to make it work in cloud as well. What worked well was KubernetesDaskEnvironment - it was more faster than the localdaskexecutor.KubernetesDaskEnvironment creates its own workers with separate cpu and memory - it helped the jobs run quite faster.
🙌 1