Sen

    Sen

    6 months ago
    Hi Good afternoon, I am trying to use the Mapping Functionality with LocalDaskExecutor to run a set of Tasks parallel, in my case (16 of them at the same time) using a simple flow. I tried both Threads and Processes as the scheduler and it didn't run the tasks parallel. But when in check the mapped function via the Schematic, I only see jobs starting and running one at a time. Please see the images below:

    Pending One Run Image

    Running One image

    Could you please let me know how to get the parallel task execution using a LocalDaskExecutor. Thanks, Sen
    Anna Geller

    Anna Geller

    6 months ago
    Hi @Sen, we've already talked about it before, environments are deprecated - check out the announcement about Prefect 1.0 here. To see how you can run tasks in parallel, check this guide.
    Here is how you can refactor your flow to make it compatible with Prefect 1.0 and to run your tasks in parallel:
    import requests
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    
    @task
    def create_url_list():
        """
        Given the main page html, creates a list of episode URLs
        """
        url_ids = [
            "21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04",
            "17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04",
            "13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04",
            "09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06",
            "05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
        ]
    
        urls = []
        for url_id in url_ids:
            urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)
    
        return urls
    
    
    @task
    def retrieve_url(url):
        print(url)
        html = requests.get(url)
        if html.ok:
            return str(len(html.content))
        else:
            return None
    
    
    with Flow(
            "On_Prem_MapTest", executor=LocalDaskExecutor()
    ) as flow:
        urls = create_url_list()
        url_results = retrieve_url.map(urls)
    
    
    if __name__ == "__main__":
        flow.register("SampleFlows", labels=["On_Prem_MapTest"])
    btw @Sen could you move the code block to the thread to keep the main channel a bit cleaner?
    Sen

    Sen

    6 months ago
    Thanks @Anna Geller for your help. Sorry, I didn't fully understand what you meant by "move the code block to the thread to keep the main channel a bit cleaner". Do I need to create a Thread? Once again sorry if this is kind of a stupid question.. 🤔
    Anna Geller

    Anna Geller

    6 months ago
    it means copy the code from your question and move it here to the Slack thread 🙂 see an example of how nice Luis did it 😄 all code blocks within the thread instead of having them in the main channel https://prefect-community.slack.com/archives/CL09KU1K7/p1645796525759529
    Sen

    Sen

    6 months ago
    The original Flow which was having issues is shown below:
    # Basic Imports
    import os
    import requests
    
    # Extracting the Prefect Server URL
    os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY_SERVER_IP:4200/graphql>"
    
    from prefect import Flow, task
    from prefect.environments import LocalEnvironment
    from prefect.engine.executors import LocalDaskExecutor
    
    
    @task
    def create_url_list():
        """
        Given the main page html, creates a list of episode URLs
        """
    
        url_ids = [
            "21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04", 
            "17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04", 
            "13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04", 
            "09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06", 
            "05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
            ]
        
        urls = []
        for url_id in url_ids:
            urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)
    
        return urls
    
    
    @task
    def retrieve_url(url):
        print(url)
        html = requests.get(url)
        if html.ok:
            return str(len(html.content))
        else:
            return None
    
    
    def main():
        """Main Function"""
        
        with Flow(
            "On_Prem_MapTest",
        ) as flow:
    
            get_urls = create_url_list()
            url_results = retrieve_url.map(get_urls)
    
        flow.environment = LocalEnvironment()
        
        # flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=16)
        flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
    
        # Registering the Flow
        flow.register("SampleFlows", labels=["On_Prem_MapTest"])
    
    if __name__ == "__main__":
        main()
    So @Anna Geller what I found out is this flow was using prefect version 0.13.18 and the agent was also in prefect version 0.13.18 and the prefect server was in version prefect==0.15.13. In this particular case the the parallel tasks didn't work. But now I upgraded the local prefect version to 0.15.13 and tried the same code which you shared, I can see the parallel runs..
    Is it ok to upgrade all my prefect, including server and agents to the latest version which is 1.0.0? Will there be any breaking changes which I need to be aware of. Or can I just upgrade the version and update the imports and things should just work.
    Anna Geller

    Anna Geller

    6 months ago
    You can upgrade but then the environments won't work because they are deprecated. Check out the 1.0 announcement I shared before 😃
    Sen

    Sen

    6 months ago
    sure. I will go through it.. thanks again for all the pointers.