https://prefect.io logo
Title
j

Jeff Yun

10/02/2019, 8:40 PM
If a Parameter consists of e.g. a list of length 100k, at what stage and how may it be batched with dask.Bag?
c

Chris White

10/03/2019, 6:18 PM
Hi Jeff, just realized I never responded to this; you’d need to do this batching within a task which takes in this parameter
j

Jeff Yun

10/03/2019, 10:11 PM
Tried your suggestion, and I have
import dask.bag as db

@task(name='create_product')
def create_product(params1, params2):
    return list(itertools.product(params1, params2))

@task(name='make_batch')
def make_batch(param, MAX_PARTITIONS=200):
    param = db.from_sequence(param, npartitions=MAX_PARTITIONS)
    return param
In my flow, I have
prod = create_product(a_list, b_list)
sim = stage_0_func.map(unmapped(cfgdir), unmapped(tmpdir), make_batch(prod))
which gives me the error
Traceback (most recent call last):
  File "REPODIR/demos/prefect/engine/task_runner.py", line 260, in run
    executor=executor,
  File "REPODIR/demos/prefect/engine/task_runner.py", line 720, in run_mapped_task
    upstream_state.result[i],
TypeError: 'Bag' object does not support indexing
c

Chris White

10/03/2019, 10:14 PM
ah, yes we currently make a strong assumption that things which are mapped over are list-like — I’m working on a large mapping refactor that might address this, although I can’t guarantee it --> in general, Prefect assumes that objects which are returned from Tasks are “context-free”, meaning they could potentially be stored somewhere and extracted at a later time. Dask-delayed-like objects don’t fit this model particularly well
Might I ask what your use case is? I’m wondering what advantages the Prefect API is providing you over just using dask directly
j

Jeff Yun

10/03/2019, 10:20 PM
My use case is being able to easily construct task DAGs, including replacing tasks, retrying tasks dynamically, and skipping downstream tasks based on upstream task states.
Customizing different task graphs for mathematical/scientific/financial experimentation
Dask doesn't seem to have good dynamic DAG generation (and general easy editing) capabilities
We need to be able to run 100k+ concurrent tasks, so I've been struggling here
c

Chris White

10/03/2019, 10:27 PM
I see; and do each of these concurrent tasks really need to be represented as an individual Prefect task, or could they be grouped together in some fashion? Generally speaking, while Prefect is built with first-class Dask support, the APIs aren’t entirely exchangeable (which is some of the pain you are running into), as Prefect Tasks are much “heavier” objects than Dask futures
j

Jeff Yun

10/03/2019, 10:30 PM
For example, a command might be "run this script for this specific stock and this specific date" and a future command might be "run another script with a subset of the above data". And there might be 1000 dates and 500 stocks for the first command --> 500k tasks
c

Chris White

10/03/2019, 10:31 PM
sure but those don’t have to be represented as 500k Prefect tasks; that could just as easily be a single Prefect task
j

Jeff Yun

10/03/2019, 10:31 PM
Since #(concurrent tasks) >> #(reasonable number of workers), we'll probably group the tasks in practice. Good point 😅
👍 1
c

Chris White

10/03/2019, 10:32 PM
A Prefect Task should be something that requires it’s own trigger-check, it’s own ability to independently retry, and should both consume and output “fully hydrated” data objects (dask delayed objects don’t work well as inputs / outputs currently because they are essentially pointers to future-computational results, which Prefect won’t know how to handle if e.g., the retry timeline is long or the Flow needs to be resumed in the future)
j

Jeff Yun

10/03/2019, 10:39 PM
So if I have a graph with • 100k commands to launch • 10k commands which use data from above • 1k commands which use data from above and so on, what would you suggest to use?
This is my first time doing distributed computing; could I combine Prefect and Dask flows?
c

Chris White

10/03/2019, 11:02 PM
yea no worries - distributed computing can be incredibly tricky; there’s an art to the design. First thought that comes to my mind: do you need to represent all of these 100k commands as a single Workflow? You might batch them into groups of ~10k. Second thought: you could write the data to disk (or some other permanent storage location) in between commands; you could still use dask primitives within each task to manipulate the data but ultimately write it to disk instead of mixing dask objects with Prefect objects, and Prefect would become much more useful for you
👍 1
j

Jeff Yun

10/03/2019, 11:08 PM
To my understanding, each of my tasks are writing to/reading from disk (the files have been small, so they've been cached in memory). Could you clarify what you mean?
By batching them into groups of ~10k, do you mean running e.g. 10 identical flows (with parameters split into 10) instead of 1?
c

Chris White

10/03/2019, 11:09 PM
Yup, on the batching that’s exactly what I was thinking; in the examples you’ve shared so far it appears as though each task you’ve written actually returns a Dask object (e.g., a Dask Bag). If instead these tasks returned file-locations / URIs, then I think Prefect might start providing you more value
j

Jeff Yun

10/03/2019, 11:11 PM
To my understanding, the tasks return their States (I assign them to variables for convenience and don't actually pass any of the return values as function args)
c

Chris White

10/03/2019, 11:12 PM
what i mean is:
@task
def my_task():
    return "my_value" # <-- this is the return value I'm referring to
j

Jeff Yun

10/03/2019, 11:12 PM
Oh, you're suggesting I save the
Dask bag
to disk instead of passing it. Interesting!
c

Chris White

10/03/2019, 11:12 PM
yea! exactly
or whatever intermediate datasets you might need; you can still use dask within the logic of your task, I’d just recommend not exchanging dask objects between your tasks at this time (unless you know how to put the objects inside Dask Queues or some other shared object container, which would allow you to share futures between Prefect Tasks)
j

Jeff Yun

10/03/2019, 11:13 PM
For a mapped task, how many times are its arguments serialized and deserialized?
c

Chris White

10/03/2019, 11:14 PM
in the current design, each argument is serialized once but as you’ve experienced this has a tendency to bottleneck the dask scheduler significantly. I’m actively working on trying to fix this
j

Jeff Yun

10/03/2019, 11:16 PM
For mapping a task across 100k arguments, the arguments themselves might be small (e.g. integers 0 to 100k) and easily deserializable, but do the worker have to deserialize 100k entire states?
map_states = executor.map(
            run_fn, initial_states, range(len(map_upstream_states)), map_upstream_states
        )
Since in task_runner.py,
initial_states
and
map_upstream_states
have length as large as the mapped arg's, even though their only difference is different arguments
c

Chris White

10/03/2019, 11:20 PM
yea they currently do have to deserialize all of those states; however, my new refactor might be able to avoid this