https://prefect.io logo
Title
s

Seth Goodman

09/02/2022, 2:25 PM
Hi All - I am migrating from Prefect 1.0 to 2.0 and am specifically dealing with a mapped task parallelized using Dask. From the 2.0 docs it sounds like you need to call Task.submit() when using a task runner like Dask, but I am unclear how that is applied when using a map. Any guidance or an example would be appreciated. Thanks!
1
k

Khuyen Tran

09/02/2022, 2:42 PM
You mean using the function
map
of dask?
s

Seth Goodman

09/02/2022, 2:46 PM
here's some pseudo code to illustrate:
@task
def my_task(arg):
    return arg

ActiveTaskRunner = DaskTaskRunner(address=dask_address)

# prefect
@flow(task_runner=ActiveTaskRunner)
def flow_test():
    task_futures = my_task.map(task_list)
    for future in task_futures:
        future.wait()
    # do more stuff with results of map?


flow_test()
k

Khuyen Tran

09/02/2022, 2:47 PM
ah I see. with
map
, you don’t need to use
.submit
to use task runner
s

Seth Goodman

09/02/2022, 2:48 PM
oh perfect. Are there any additional tweaks needed to use the results from a map call? Running my code based on structure above I keep getting
TypeError: submit() got an unexpected keyword argument 'key'
Here's a minimal example which replicated:
@task
def map_print(x):
    print(x)

@flow(task_runner=DaskTaskRunner())
def my_flow():
    futures = map_print.map([1,2,3,4,5])
    for future in futures:
        future.wait()
    # print(futures)

my_flow()
k

Khuyen Tran

09/02/2022, 2:57 PM
Can you confirm if you have the TypeError without DaskTaskRunner?
s

Seth Goodman

09/02/2022, 2:58 PM
Using ConcurrentTaskRunner it works
it also seems like more basic examples using DaskTaskRunner (from the docs) fail with same error
Perhaps this is related to dask setup
(prefect-dask is installed, and Dask is confirmed to be up and running)
k

Khuyen Tran

09/02/2022, 3:04 PM
Can you upgrade to the latest version of Prefect and see if the error still persists?
s

Seth Goodman

09/02/2022, 3:06 PM
i'm on 2.3.0 but I can try 2.3.1
same error
k

Khuyen Tran

09/02/2022, 3:09 PM
@Ryan Peden Do you have any insights into this?
s

Seth Goodman

09/02/2022, 3:17 PM
rebuilding a minimal environment to isolate this a bit further
Untitled.txt
attaching full error output
r

Ryan Peden

09/02/2022, 3:25 PM
Thanks - I'm running a few quick tests on my end as well
s

Seth Goodman

09/02/2022, 3:27 PM
Here's full version info too: Version: 2.3.1 API version: 0.8.0 Python version: 3.10.6 Git commit: 1d485b1d Built: Thu, Sep 1, 2022 3:53 PM OS/Arch: linux/x86_64 Profile: default Server type: ephemeral Server: Database: sqlite SQLite version: 3.39.2
Was testing with python 3.8 / 3.9 earlier as well, same issue
r

Ryan Peden

09/02/2022, 3:37 PM
I get it too - I think it is a problem when using a
DaskTaskRunner
with Prefect 2.3.x. It seems to work fine with 2.2.x. I see we've got a high priority issue open for this in GitHub, so I expect we will have a fix published very soon. In the meantime, if you don't need any of the features or fixes added to 2.3.x, I'd recommend using Prefect 2.2.1.
s

Seth Goodman

09/02/2022, 3:48 PM
Using 2.2.0 ( I did not find a 2.2.1) there is now an issue with the alembic package:
alembic.util.exc.CommandError: Can't locate revision identified by 'e757138e954a'
I created a fresh environment to confirm this wasn't an issue with downgrading the version
(prefect2_test2) userx@computerx:~/Desktop/tuff_osm$ prefect version Version: 2.2.0 API version: 0.8.0 Python version: 3.9.13 Git commit: e3651362 Built: Tue, Aug 23, 2022 2:18 PM OS/Arch: linux/x86_64 Profile: default Server type: <client error>
r

Ryan Peden

09/02/2022, 3:53 PM
Ah - I think that is the Prefect database. 2.3 probably adds some non-backwards compatible changes. If you don't mind getting rid of any old flow runs you've recorded in it, running
rm -rf ~/.prefect
will take care of the issue you are seeing
s

Seth Goodman

09/02/2022, 3:55 PM
that worked, thank you for all the help!