https://prefect.io logo
r

Rob Fowler

10/15/2020, 2:24 AM
I have an issue with LocalDaskExecutor, I have reduced my code down to a simple example. In this example, if I run at tasks, each sleeping for 3 seconds, the flow does not map some of the tasks. I suspect it's my use of a mapped task. The range is simple a list of numbers. What happens is it does not run the 'slow_task' for every item in the list.
Copy code
I get: | Flow run FAILED: some reference tasks failed on account of it never scheduling one of the slow_task workers.If there is anything but no load on the machine it fails. If the machine is 1% CPU it works.# python slow.py --range=10 --sleep_time=3
from time import sleep
import argparse

from prefect import Flow, Parameter, unmapped, task, context
from prefect.engine.executors import LocalDaskExecutor


@task(timeout=9)
def slow_task(opts, item, scripts):
    logger = context.get('logger')
    <http://logger.info|logger.info>(f"==== IN TASK {item} Sleeping {opts.sleep_time}")
    sleep(opts.sleep_time)
    <http://logger.info|logger.info>(f"## Awake {item}")
    return item


@task
def produce_range(opts):
    return range(opts.range)


with Flow("PS Version") as flow:
    scripts = Parameter('scripts')
    opts = Parameter('opts')

    nrange = produce_range(opts)
    results = slow_task.map(item=nrange,
                            scripts=unmapped(scripts),
                            opts=unmapped(opts))


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='test pywinrm')
    parser.add_argument('--workers', type=int, default=10)
    parser.add_argument('--sleep_time', type=int, default=2)
    parser.add_argument('--range', type=int, default=10)

    opts = parser.parse_args()

    executor = LocalDaskExecutor(num_workers=opts.workers)
    flow.run(executor=executor,
             scripts="hello",
             opts=opts)
c

Chris White

10/15/2020, 2:38 AM
Hi Rob, would you mind editing your comment and pasting the large code block within this thread? That helps keep the main channel clean and easily readable
Otherwise, could you share the evidence you have for the slow task not mapping across all items? Does the flow still finish? In what state? etc.
r

Rob Fowler

10/15/2020, 2:46 AM
I am trying to enter in a code block so it collapses. Sorry. I have been using that damned teams thing for a year.
😂 1
Copy code
python slow.py --range=10 --sleep_time=3
[2020-10-15 02:47:51] INFO - prefect.FlowRunner | Beginning Flow run for 'PS Version'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'scripts': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'opts': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'scripts': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'opts': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'slow_task': Starting task run...
[2020-10-15 02:47:51] INFO - prefect.TaskRunner | Task 'slow_task': finished task run for task with final state: 'Mapped'
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[5]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.slow_task[7] | ==== IN TASK 7 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[1] | ==== IN TASK 1 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run...
[2020-10-15 02:47:52] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3
[2020-10-15 02:47:52] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3
[2020-10-15 02:47:55] INFO - prefect.slow_task[0] | ## Awake 0
[2020-10-15 02:47:55] INFO - prefect.slow_task[3] | ## Awake 3
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.slow_task[5] | ## Awake 5
[2020-10-15 02:47:55] INFO - prefect.slow_task[7] | ## Awake 7
[2020-10-15 02:47:55] INFO - prefect.slow_task[1] | ## Awake 1
[2020-10-15 02:47:55] INFO - prefect.slow_task[2] | ## Awake 2
[2020-10-15 02:47:55] INFO - prefect.slow_task[8] | ## Awake 8
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.slow_task[6] | ## Awake 6
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:55] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:56] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success'
[2020-10-15 02:47:56] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success'
[2020-10-15 02:48:01] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'TimedOut'
[2020-10-15 02:48:01] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'TimedOut'
[2020-10-15 02:48:02] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
it does not run the slow_task for each item in the list produced by 'produce_range'
actually I just extended the timeout (which should not have hit 9 seconds according to the logs. Now I made the timeout 30 seconds and it's timing out the tasks.
python slow.py --range=10 --sleep_time=3
[2020-10-15 025247] INFO - prefect.FlowRunner | Beginning Flow run for 'PS Version' [2020-10-15 025247] INFO - prefect.TaskRunner | Task 'scripts': Starting task run... [2020-10-15 025247] INFO - prefect.TaskRunner | Task 'opts': Starting task run... [2020-10-15 025247] INFO - prefect.TaskRunner | Task 'scripts': finished task run for task with final state: 'Success' [2020-10-15 025247] INFO - prefect.TaskRunner | Task 'opts': finished task run for task with final state: 'Success' [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run... [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success' [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task': Starting task run... [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task': finished task run for task with final state: 'Mapped' [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run... [2020-10-15 025248] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run... [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run... [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run... [2020-10-15 025249] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3 [2020-10-15 025249] INFO - prefect.slow_task[1] | ==== IN TASK 1 Sleeping 3 [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run... [2020-10-15 025249] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3 [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run... [2020-10-15 025249] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3 [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[5]': Starting task run... [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run... [2020-10-15 025249] INFO - prefect.slow_task[4] | ==== IN TASK 4 Sleeping 3 [2020-10-15 025249] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3 [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run... [2020-10-15 025249] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run... [2020-10-15 025249] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3 [2020-10-15 025249] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3 [2020-10-15 025249] INFO - prefect.slow_task[9] | ==== IN TASK 9 Sleeping 3 [2020-10-15 025252] INFO - prefect.slow_task[6] | ## Awake 6 [2020-10-15 025252] INFO - prefect.slow_task[1] | ## Awake 1 [2020-10-15 025252] INFO - prefect.slow_task[3] | ## Awake 3 [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.slow_task[0] | ## Awake 0 [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.slow_task[4] | ## Awake 4 [2020-10-15 025252] INFO - prefect.slow_task[2] | ## Awake 2 [2020-10-15 025252] INFO - prefect.slow_task[8] | ## Awake 8 [2020-10-15 025252] INFO - prefect.slow_task[5] | ## Awake 5 [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.slow_task[9] | ## Awake 9 [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success' [2020-10-15 025252] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'Success' [2020-10-15 025319] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'TimedOut' [2020-10-15 025319] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
so now I set the timeout to be 30 seconds,
the interesting thing is, I can see prefect scheduling task 7, but it never gets into the function.
c

Chris White

10/15/2020, 3:01 AM
I think this is just a consequence of timeouts - depending on where dask runs the task, prefect alters how the timeout is enforced. Sometimes prefect will run the task in a subprocess, which means the stdout / stderror (including logs) won’t show up in your main process. The fact that the flow is still ending tells me it’s executing as you expect, you’re just noticing a lack of logs (if running against a backend, I expect that you’d see them). Perhaps the best way to see this for yourself would be to capture and inspect the flow run state:
Copy code
flow_state = flow.run(executor=executor,
             scripts="hello",
             opts=opts)
flow_state.result[results].map_states # will be a list of states corresponding to the mapped `slow_task`
r

Rob Fowler

10/15/2020, 3:14 AM
who would a 30 second timeout catch a 2 second sleep, the machine is not loaded that much? In any case, I'll check the results before anything. You are right the stdout can go missing
no, it is timing out, there is no result, for example:
Copy code
0
Execution timed out.
2
3
4
5
6
7
8
9
it never ran task '1' here:
Copy code
020-10-15 03:27:36] INFO - prefect.TaskRunner | Task 'slow_task[3]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[3] | ==== IN TASK 3 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[5] | ==== IN TASK 5 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[6]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[6] | ==== IN TASK 6 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[4]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[1]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[2]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[0]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[4] | ==== IN TASK 4 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[0] | ==== IN TASK 0 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[9]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[7]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[7] | ==== IN TASK 7 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.TaskRunner | Task 'slow_task[8]': Starting task run...
[2020-10-15 03:27:37] INFO - prefect.slow_task[2] | ==== IN TASK 2 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[9] | ==== IN TASK 9 Sleeping 3
[2020-10-15 03:27:37] INFO - prefect.slow_task[8] | ==== IN TASK 8 Sleeping 3
[2020-10-15 03:27:40] INFO - prefect.slow_task[3] | ## Awake 3
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[3]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[5] | ## Awake 5
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[5]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[6] | ## Awake 6
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[6]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[4] | ## Awake 4
[2020-10-15 03:27:40] INFO - prefect.slow_task[0] | ## Awake 0
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[4]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[0]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[7] | ## Awake 7
[2020-10-15 03:27:40] INFO - prefect.slow_task[2] | ## Awake 2
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[7]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.slow_task[9] | ## Awake 9
[2020-10-15 03:27:40] INFO - prefect.slow_task[8] | ## Awake 8
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[2]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[9]': finished task run for task with final state: 'Success'
[2020-10-15 03:27:40] INFO - prefect.TaskRunner | Task 'slow_task[8]': finished task run for task with final state: 'Success'
[2020-10-15 03:28:07] INFO - prefect.TaskRunner | Task 'slow_task[1]': finished task run for task with final state: 'TimedOut'
I have put the example in https://github.com/mianos/pbug.git so I can try it on more machines. My work machine is a redhat one in a large cluster. Running it on my ubuntu 20.04 machine at home, with 8 cores, it is worse.
c

Chris White

10/15/2020, 3:43 AM
This looks like a resource constraint on the machine you’re running with, in which the thread / process responsible for running the task fails to ever start and the timer runs out (we’ve seen this frequently with python 3.9 FWIW). I think identifying the issue further will require more info about your OS / your environment / available CPU / etc. so I suggest opening an issue on GitHub for us to triage further
r

Rob Fowler

10/15/2020, 3:43 AM
this new test, on the new machine is a monster machine and nothing running on it
c

Chris White

10/15/2020, 3:44 AM
ah interesting, yea the fact that it changes across machines is further evidence of an inability to spawn new threads / processes for some reason; it’s possible it’s an upstream dask bug but let’s move to an issue and triage from there
is the run on a much bigger machine
python Python 3.8.5
OK, I'll open an issue and put this repo and gists into it
👍 1
I reckon it's Dask, I can't see prefect doing anything fancy here
I have never used the dask low level api myself, so I'll have a quick look if I can make a similarly functional example
c

Chris White

10/15/2020, 3:51 AM
Yea that sounds great; one of the prefect engineers is a core dask maintainer so he usually knows the scoop too but either way if you can pinpoint it to dask yourself that’d be very useful
I just tried it on a few more machines. The faster machines failed more.
Open to feedback on that issue. I have included a run and dumps from the pip environment
z

Zanie

10/15/2020, 9:25 PM
@Rob Fowler just a heads up that you may be able to fix this by using
processes=True
as described at https://github.com/PrefectHQ/prefect/issues/3506#issuecomment-709493914 — I’ll continue to try to sort out what’s happening in the threaded case though!
r

Rob Fowler

10/15/2020, 9:25 PM
yes, I see that
Thanks, I can use that at work. I'd really like to get my concurrent futures executor working today as well.
z

Zanie

10/15/2020, 9:27 PM
Are you referring to your second thread?
j

Jim Crist-Harif

10/15/2020, 10:10 PM
@Rob Fowler, the way prefect is currently implemented, writing your own executor (that can run things in parallel) isn't the easiest thing to do. Prefect was originally written based fully on Dask, and implicitly relies on a lot of dask's internal task chaining to work. It's on my todo list to better define the executor interface, but that's not a high priority. In the absence of bugs (which are very rare in dask's local scheduler, given how stable that code has been for the last 3 years), the
LocalDaskExecutor
should perform on par (or better) perf wise with any of the stdlib
concurrent.futures
executors. Most users have been satisfied to use
LocalDaskExecutor
with either threads or processes without issue. Dask (not distributed, just dask) has no dependencies, and is a small wrapper around a thread or process pool, much like
concurrent.futures
. What's your goal here with using a
concurrent.futures
based executor? The
Executor
interface isn't exactly public, so if you did write your own we'd make no guarantees that it wouldn't break in the next release.
I see Michael is already looking at your issue, I'll take a look tomorrow myself as well if it's not resolved.
r

Rob Fowler

10/15/2020, 10:49 PM
My only goal was to work around the issue I have. Looking at the Executor implementation, doco and the unit tests it looked quite simple. I ran out of time last night but looking at it this morning I think all I would have to do is write something that converts a Dask 'futures-like' future into a concurrent.futures object and something to map it back to populate the result. I was not aware of the 'chaining' of tasks so, once I finished the first bit I would have probably hit a more serious block. Thanks for telling me. I'll drop it. My only suggestion would be to add some notes saying 'this is not a stable interface'. The unit tests don't cover the more complex cases.
Updated my code to use procs and not threads and rolled it out to production (don't tell anyone, it's Friday afternoon and no rollouts time 🙂 ) I would have to say, it's much more stable with tasks as processes. I am not sure if I was hitting this bug or it's just the fact that procs are more stable in that when something exits everything is cleaned up. That said, I had 100 works as the default and when the 100 procs got fired and used all the memory up k8s made a real mess of it. A thread pool of 100 workers does not really need any more resources. Now I am down to 10 procs and just running a few thousand tasks it's going better than ever.
z

Zanie

10/16/2020, 1:54 PM
Glad to hear it’s working! I’ll respond in the Github issue (or Jim will) once we figure out what’s going on with the thread pool.
This issue looks isolated to linux and timeout enforced tasks
4 Views