Rob Fowler
10/15/2020, 2:24 AMI get: | Flow run FAILED: some reference tasks failed on account of it never scheduling one of the slow_task workers.If there is anything but no load on the machine it fails. If the machine is 1% CPU it works.# python slow.py --range=10 --sleep_time=3
from time import sleep
import argparse
from prefect import Flow, Parameter, unmapped, task, context
from prefect.engine.executors import LocalDaskExecutor
@task(timeout=9)
def slow_task(opts, item, scripts):
logger = context.get('logger')
<http://logger.info|logger.info>(f"==== IN TASK {item} Sleeping {opts.sleep_time}")
sleep(opts.sleep_time)
<http://logger.info|logger.info>(f"## Awake {item}")
return item
@task
def produce_range(opts):
return range(opts.range)
with Flow("PS Version") as flow:
scripts = Parameter('scripts')
opts = Parameter('opts')
nrange = produce_range(opts)
results = slow_task.map(item=nrange,
scripts=unmapped(scripts),
opts=unmapped(opts))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='test pywinrm')
parser.add_argument('--workers', type=int, default=10)
parser.add_argument('--sleep_time', type=int, default=2)
parser.add_argument('--range', type=int, default=10)
opts = parser.parse_args()
executor = LocalDaskExecutor(num_workers=opts.workers)
flow.run(executor=executor,
scripts="hello",
opts=opts)
Chris White
Rob Fowler
10/15/2020, 2:46 AMpython slow.py --range=10 --sleep_time=3
[2020-10-15 02:47:51] INFO - prefect.FlowRunner | Beginning Flow run for 'PS Version'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'scripts': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'opts': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'scripts': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'opts': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'slow_task': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'slow_task': finished task run for task with final state: 'Mapped'
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[5]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.slow_task[7] | ==== IN TASK 7 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[1] | ==== IN TASK 1 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3
[2020-10-15 02:47:55] INFO - prefect.slow_task[0] | ## Awake 0
[2020-10-15 02:47:55] INFO - prefect.slow_task[3] | ## Awake 3
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.slow_task[5] | ## Awake 5
[2020-10-15 02:47:55] INFO - prefect.slow_task[7] | ## Awake 7
[2020-10-15 02:47:55] INFO - prefect.slow_task[1] | ## Awake 1
[2020-10-15 02:47:55] INFO - prefect.slow_task[2] | ## Awake 2
[2020-10-15 02:47:55] INFO - prefect.slow_task[8] | ## Awake 8
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.slow_task[6] | ## Awake 6
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:56] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:56] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success'
[2020-10-15 02:48:01] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'TimedOut'
[2020-10-15 02:48:01] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'TimedOut'
[2020-10-15 02:48:02] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
python slow.py --range=10 --sleep_time=3
[2020-10-15 025247] INFO - prefect.FlowRunner | Beginning Flow run for 'PS Version'
[2020-10-15 025247] INFO - prefect.TaskRunner | Task 'scripts': Starting task run...
[2020-10-15 025247] INFO - prefect.TaskRunner | Task 'opts': Starting task run...
[2020-10-15 025247] INFO - prefect.TaskRunner | Task 'scripts': finished task run for task with final state: 'Success'
[2020-10-15 025247] INFO - prefect.TaskRunner | Task 'opts': finished task run for task with final state: 'Success'
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task': Starting task run...
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task': finished task run for task with final state: 'Mapped'
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run...
[2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run...
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run...
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run...
[2020-10-15 025249] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3
[2020-10-15 025249] INFO - prefect.slow_task[1] | ==== IN TASK 1 Sleeping 3
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run...
[2020-10-15 025249] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run...
[2020-10-15 025249] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[5]': Starting task run...
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run...
[2020-10-15 025249] INFO - prefect.slow_task[4] | ==== IN TASK 4 Sleeping 3
[2020-10-15 025249] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run...
[2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run...
[2020-10-15 025249] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3
[2020-10-15 025249] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3
[2020-10-15 025249] INFO - prefect.slow_task[9] | ==== IN TASK 9 Sleeping 3
[2020-10-15 025252] INFO - prefect.slow_task[6] | ## Awake 6
[2020-10-15 025252] INFO - prefect.slow_task[1] | ## Awake 1
[2020-10-15 025252] INFO - prefect.slow_task[3] | ## Awake 3
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.slow_task[0] | ## Awake 0
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.slow_task[4] | ## Awake 4
[2020-10-15 025252] INFO - prefect.slow_task[2] | ## Awake 2
[2020-10-15 025252] INFO - prefect.slow_task[8] | ## Awake 8
[2020-10-15 025252] INFO - prefect.slow_task[5] | ## Awake 5
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.slow_task[9] | ## Awake 9
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success'
[2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'Success'
[2020-10-15 025319] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'TimedOut'
[2020-10-15 025319] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.Chris White
flow_state = flow.run(executor=executor,
scripts="hello",
opts=opts)
flow_state.result[results].map_states # will be a list of states corresponding to the mapped `slow_task`
Rob Fowler
10/15/2020, 3:14 AM0
Execution timed out.
2
3
4
5
6
7
8
9
020-10-15 03:27:36] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[4] | ==== IN TASK 4 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[7] | ==== IN TASK 7 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[9] | ==== IN TASK 9 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3
[2020-10-15 03:27:40] INFO - prefect.slow_task[3] | ## Awake 3
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[5] | ## Awake 5
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[6] | ## Awake 6
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[4] | ## Awake 4
[2020-10-15 03:27:40] INFO - prefect.slow_task[0] | ## Awake 0
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[7] | ## Awake 7
[2020-10-15 03:27:40] INFO - prefect.slow_task[2] | ## Awake 2
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[9] | ## Awake 9
[2020-10-15 03:27:40] INFO - prefect.slow_task[8] | ## Awake 8
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success'
[2020-10-15 03:28:07] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'TimedOut'
Chris White
Rob Fowler
10/15/2020, 3:43 AMChris White
Rob Fowler
10/15/2020, 3:44 AMChris White
Rob Fowler
10/15/2020, 4:23 AMZanie
processes=True
as described at https://github.com/PrefectHQ/prefect/issues/3506#issuecomment-709493914 — I’ll continue to try to sort out what’s happening in the threaded case though!Rob Fowler
10/15/2020, 9:25 PMZanie
Jim Crist-Harif
10/15/2020, 10:10 PMLocalDaskExecutor
should perform on par (or better) perf wise with any of the stdlib concurrent.futures
executors. Most users have been satisfied to use LocalDaskExecutor
with either threads or processes without issue. Dask (not distributed, just dask) has no dependencies, and is a small wrapper around a thread or process pool, much like concurrent.futures
.
What's your goal here with using a concurrent.futures
based executor? The Executor
interface isn't exactly public, so if you did write your own we'd make no guarantees that it wouldn't break in the next release.Rob Fowler
10/15/2020, 10:49 PMZanie