If a Parameter consists of e.g. a list of length 1...
# ask-community
j
If a Parameter consists of e.g. a list of length 100k, at what stage and how may it be batched with dask.Bag?
c
Hi Jeff, just realized I never responded to this; you’d need to do this batching within a task which takes in this parameter
j
Tried your suggestion, and I have
Copy code
import dask.bag as db

@task(name='create_product')
def create_product(params1, params2):
    return list(itertools.product(params1, params2))

@task(name='make_batch')
def make_batch(param, MAX_PARTITIONS=200):
    param = db.from_sequence(param, npartitions=MAX_PARTITIONS)
    return param
In my flow, I have
Copy code
prod = create_product(a_list, b_list)
sim = stage_0_func.map(unmapped(cfgdir), unmapped(tmpdir), make_batch(prod))
which gives me the error
Copy code
Traceback (most recent call last):
  File "REPODIR/demos/prefect/engine/task_runner.py", line 260, in run
    executor=executor,
  File "REPODIR/demos/prefect/engine/task_runner.py", line 720, in run_mapped_task
    upstream_state.result[i],
TypeError: 'Bag' object does not support indexing
c
ah, yes we currently make a strong assumption that things which are mapped over are list-like — I’m working on a large mapping refactor that might address this, although I can’t guarantee it --> in general, Prefect assumes that objects which are returned from Tasks are “context-free”, meaning they could potentially be stored somewhere and extracted at a later time. Dask-delayed-like objects don’t fit this model particularly well
Might I ask what your use case is? I’m wondering what advantages the Prefect API is providing you over just using dask directly
j
My use case is being able to easily construct task DAGs, including replacing tasks, retrying tasks dynamically, and skipping downstream tasks based on upstream task states.
Customizing different task graphs for mathematical/scientific/financial experimentation
Dask doesn't seem to have good dynamic DAG generation (and general easy editing) capabilities
We need to be able to run 100k+ concurrent tasks, so I've been struggling here
c
I see; and do each of these concurrent tasks really need to be represented as an individual Prefect task, or could they be grouped together in some fashion? Generally speaking, while Prefect is built with first-class Dask support, the APIs aren’t entirely exchangeable (which is some of the pain you are running into), as Prefect Tasks are much “heavier” objects than Dask futures
j
For example, a command might be "run this script for this specific stock and this specific date" and a future command might be "run another script with a subset of the above data". And there might be 1000 dates and 500 stocks for the first command --> 500k tasks
c
sure but those don’t have to be represented as 500k Prefect tasks; that could just as easily be a single Prefect task
j
Since #(concurrent tasks) >> #(reasonable number of workers), we'll probably group the tasks in practice. Good point 😅
👍 1
c
A Prefect Task should be something that requires it’s own trigger-check, it’s own ability to independently retry, and should both consume and output “fully hydrated” data objects (dask delayed objects don’t work well as inputs / outputs currently because they are essentially pointers to future-computational results, which Prefect won’t know how to handle if e.g., the retry timeline is long or the Flow needs to be resumed in the future)
j
So if I have a graph with • 100k commands to launch • 10k commands which use data from above • 1k commands which use data from above and so on, what would you suggest to use?
This is my first time doing distributed computing; could I combine Prefect and Dask flows?
c
yea no worries - distributed computing can be incredibly tricky; there’s an art to the design. First thought that comes to my mind: do you need to represent all of these 100k commands as a single Workflow? You might batch them into groups of ~10k. Second thought: you could write the data to disk (or some other permanent storage location) in between commands; you could still use dask primitives within each task to manipulate the data but ultimately write it to disk instead of mixing dask objects with Prefect objects, and Prefect would become much more useful for you
👍 1
j
To my understanding, each of my tasks are writing to/reading from disk (the files have been small, so they've been cached in memory). Could you clarify what you mean?
By batching them into groups of ~10k, do you mean running e.g. 10 identical flows (with parameters split into 10) instead of 1?
c
Yup, on the batching that’s exactly what I was thinking; in the examples you’ve shared so far it appears as though each task you’ve written actually returns a Dask object (e.g., a Dask Bag). If instead these tasks returned file-locations / URIs, then I think Prefect might start providing you more value
j
To my understanding, the tasks return their States (I assign them to variables for convenience and don't actually pass any of the return values as function args)
c
what i mean is:
Copy code
@task
def my_task():
    return "my_value" # <-- this is the return value I'm referring to
j
Oh, you're suggesting I save the
Dask bag
to disk instead of passing it. Interesting!
c
yea! exactly
or whatever intermediate datasets you might need; you can still use dask within the logic of your task, I’d just recommend not exchanging dask objects between your tasks at this time (unless you know how to put the objects inside Dask Queues or some other shared object container, which would allow you to share futures between Prefect Tasks)
j
For a mapped task, how many times are its arguments serialized and deserialized?
c
in the current design, each argument is serialized once but as you’ve experienced this has a tendency to bottleneck the dask scheduler significantly. I’m actively working on trying to fix this
j
For mapping a task across 100k arguments, the arguments themselves might be small (e.g. integers 0 to 100k) and easily deserializable, but do the worker have to deserialize 100k entire states?
Copy code
map_states = executor.map(
            run_fn, initial_states, range(len(map_upstream_states)), map_upstream_states
        )
Since in task_runner.py,
initial_states
and
map_upstream_states
have length as large as the mapped arg's, even though their only difference is different arguments
c
yea they currently do have to deserialize all of those states; however, my new refactor might be able to avoid this