Faheem Khan
07/03/2022, 11:40 PMAnna Geller
07/04/2022, 11:33 AMprefect version
2. Some code to reproduce the issue
3. The exception traceback?from prefect import flow, task
from prefect import get_run_logger
from prefect.task_runners import DaskTaskRunner
from prefect.flow_runners import DockerFlowRunner
from prefect.deployments import DeploymentSpec
import time
@task
def compute_something(x):
logger = get_run_logger()
<http://logger.info|logger.info>("Computing: %d x 2 = %d", x, x * 2)
time.sleep(2)
@flow(task_runner=DaskTaskRunner())
def dask_flow():
for i in range(50):
compute_something(i)
DeploymentSpec(name="staging", flow=dask_flow, flow_runner=DockerFlowRunner())
Faheem Khan
07/05/2022, 1:20 AM@task
def compute_something(x):
logger = get_run_logger()
<http://logger.info|logger.info>("Computing: %d x 2 = %d", x, x * 2)
time.sleep(2)
@flow(task_runner=DaskTaskRunner())
def dask_flow():
list(map(compute_something, range(50))) #worked
if __name__ == '__main__':
#list(map(dask_flow, range(50))) #didn't work
dask_flow()
DeploymentSpec(name="staging", flow=dask_flow, flow_runner=DockerFlowRunner())
Anna Geller
07/05/2022, 11:11 AM