Hi Good afternoon, I am trying to use the Mapping...
# prefect-server
s
Hi Good afternoon, I am trying to use the Mapping Functionality with LocalDaskExecutor to run a set of Tasks parallel, in my case (16 of them at the same time) using a simple flow. I tried both Threads and Processes as the scheduler and it didn't run the tasks parallel. But when in check the mapped function via the Schematic, I only see jobs starting and running one at a time. Please see the images below:

Pending One Run Image

Running One image

Could you please let me know how to get the parallel task execution using a LocalDaskExecutor. Thanks, Sen
👋 2
a
Hi @Sen, we've already talked about it before, environments are deprecated - check out the announcement about Prefect 1.0 here. To see how you can run tasks in parallel, check this guide.
👍 1
Here is how you can refactor your flow to make it compatible with Prefect 1.0 and to run your tasks in parallel:
Copy code
import requests
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor


@task
def create_url_list():
    """
    Given the main page html, creates a list of episode URLs
    """
    url_ids = [
        "21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04",
        "17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04",
        "13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04",
        "09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06",
        "05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
    ]

    urls = []
    for url_id in url_ids:
        urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)

    return urls


@task
def retrieve_url(url):
    print(url)
    html = requests.get(url)
    if html.ok:
        return str(len(html.content))
    else:
        return None


with Flow(
        "On_Prem_MapTest", executor=LocalDaskExecutor()
) as flow:
    urls = create_url_list()
    url_results = retrieve_url.map(urls)


if __name__ == "__main__":
    flow.register("SampleFlows", labels=["On_Prem_MapTest"])
btw @Sen could you move the code block to the thread to keep the main channel a bit cleaner?
s
Thanks @Anna Geller for your help. Sorry, I didn't fully understand what you meant by "*move the code block to the thread to keep the main channel a bit cleaner*". Do I need to create a Thread? Once again sorry if this is kind of a stupid question.. 🤔
a
it means copy the code from your question and move it here to the Slack thread 🙂 see an example of how nice Luis did it 😄 all code blocks within the thread instead of having them in the main channel https://prefect-community.slack.com/archives/CL09KU1K7/p1645796525759529
s
The original Flow which was having issues is shown below:
Copy code
# Basic Imports
import os
import requests

# Extracting the Prefect Server URL
os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY_SERVER_IP:4200/graphql>"

from prefect import Flow, task
from prefect.environments import LocalEnvironment
from prefect.engine.executors import LocalDaskExecutor


@task
def create_url_list():
    """
    Given the main page html, creates a list of episode URLs
    """

    url_ids = [
        "21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04", 
        "17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04", 
        "13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04", 
        "09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06", 
        "05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
        ]
    
    urls = []
    for url_id in url_ids:
        urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)

    return urls


@task
def retrieve_url(url):
    print(url)
    html = requests.get(url)
    if html.ok:
        return str(len(html.content))
    else:
        return None


def main():
    """Main Function"""
    
    with Flow(
        "On_Prem_MapTest",
    ) as flow:

        get_urls = create_url_list()
        url_results = retrieve_url.map(get_urls)

    flow.environment = LocalEnvironment()
    
    # flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=16)
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)

    # Registering the Flow
    flow.register("SampleFlows", labels=["On_Prem_MapTest"])

if __name__ == "__main__":
    main()
So @Anna Geller what I found out is this flow was using prefect version 0.13.18 and the agent was also in prefect version 0.13.18 and the prefect server was in version prefect==0.15.13. In this particular case the the parallel tasks didn't work. But now I upgraded the local prefect version to 0.15.13 and tried the same code which you shared, I can see the parallel runs..
Is it ok to upgrade all my prefect, including server and agents to the latest version which is 1.0.0? Will there be any breaking changes which I need to be aware of. Or can I just upgrade the version and update the imports and things should just work.
a
You can upgrade but then the environments won't work because they are deprecated. Check out the 1.0 announcement I shared before :)
s
sure. I will go through it.. thanks again for all the pointers.
👍 1