Sen

    Sen

    7 months ago
    Hello everyone! This is Sen from SouthAfrica. I have been struggling with a problem for sometime now. Googled a bit and can't get any directions to solve this issue. So I am running my own prefect server in an Azure VM and I am running an agent in my local machine. The Prefect version I am running is 0.13.18 everywhere. The prefect agent was started in my machine using the below command :
    prefect agent local start -l 'On_Prem_MapTest' -a "<http://MY-SERVER-IP:4200>"
    Please find the code for the flow below:
    # Basic Imports
    import psutil
    import os
    import requests
    
    # Extracting the Prefect Server URL
    os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY-SERVER-IP:4200/graphql>"
    
    import prefect
    from prefect import Flow, Parameter, task
    from prefect.core.task import Task
    from prefect.engine import signals
    from prefect.engine.state import State
    from prefect.environments import LocalEnvironment
    from prefect.environments.storage import Docker, Azure
    from prefect.engine.executors import LocalDaskExecutor, DaskExecutor
    
    
    @task
    def create_url_list():
        """
        Given the main page html, creates a list of episode URLs
        """
    
        url_ids = [21.10, 21.04, 20.10, 20.04, 19.10, 19.04, 18.10, 18.04, 17.10, 17.04, 16.10, 16.04, 
                   15.10, 15.04, 14.10, 14.04, 13.10, 13.04, 12.10, 12.04, 11.10, 11.04, 10.10, 10.04, 09.10, 09.04, 
                   08.10, 08.04, 07.10, 07.04, 06.10, 06.06, 05.10, 05.04, 04.10, 06.10, 06.06, 05.10, 05.04, 04.10]
        
        urls = []
        for url_id in url_ids:
            urls.append('<http://old-releases.ubuntu.com/releases/>' + str(url_id))
    
        return urls
    
    
    @task
    def retrieve_url(url):
        print(url)
        html = requests.get(url)
        if html.ok:
            return str(len(html.content))
        else:
            return None
    
    
    def main():
        """Main Function"""
        
        with Flow(
            "On_Prem_MapTest",
        ) as flow:
    
            get_urls = create_url_list()
            url_results = retrieve_url.map(get_urls)
    
        # Registering the Flow as a Docker
        flow.environment = LocalEnvironment()
        # flow.storage = Local()
        
        flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
        flow.register("SampleFlows", labels=["On_Prem_MapTest"])
        
        # The code below when uncommented works fine 
        # final_state = flow.run()
        # final_state_2 = final_state.result[url_results]
        # print('\n'.join([f'{s.result}: {s}' for s in final_state_2.map_states[:5]]))
    
    if __name__ == "__main__":
        main()
    The problem is when I try to run this using flow from the agent by registering it and then using the PrefectUI to run the task, then it fails with the below message:
    Failed to retrieve task state with error: ClientError('400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n    GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on\n        type "Mutation". Did you mean "get_or_create_task_run" or\n        "get_or_create_mapped_task_run_children"?\n\nThe GraphQL query was:\n\n    mutation {\n            get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {\n                id\n                version\n                serialized_state\n        }\n    }\n\nThe passed variables were:\n\n    null\n')
    Traceback (most recent call last):
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 360, in _send_request
        response.raise_for_status()
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/requests/models.py", line 953, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
        map_index=map_index,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 1331, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 303, in graphql
        retry_on_api_error=retry_on_api_error,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 219, in post
        retry_on_api_error=retry_on_api_error,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 445, in _request
        session=session, method=method, url=url, params=params, headers=headers
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _send_request
        raise ClientError(f"{exc}\n{graphql_msg}") from exc
    prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 
    
    The following error messages were provided by the GraphQL server:
    
        GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on
            type "Mutation". Did you mean "get_or_create_task_run" or
            "get_or_create_mapped_task_run_children"?
    
    The GraphQL query was:
    
        mutation {
                get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {
                    id
                    version
                    serialized_state
            }
        }
    
    The passed variables were:
    
        null
    But this works when run directly from the machine using the flow.run() command as commented in the below code without using the agent. I am not sure why it happens like this. It will be great if someone can point me in the right direction on how to solve this issue. I am also not sure on how to finally iterate through the results of the method url_results, once all mapped tasks are done. Thanks in advance,
    Anna Geller

    Anna Geller

    7 months ago
    Hi and welcome to the community! Can you explain the problem a bit more? 🙂
    Stéphan Taljaard

    Stéphan Taljaard

    7 months ago
    Hi Navaneeth. Great seeing another South African here 🇿🇦
    Sen

    Sen

    7 months ago
    oops.. I typed the whole thing and then when I clicked send, it just disappeared.. 😱
    ok.. it is updated now..😥
    Just to give you an overview of the task I am trying to accomplish, I am trying to figure out how to use "map" and "LocalDaskExecutor" in a Prefect Docker Agent setup to parallelise on of my slow running tasks.
    @Anna Geller I have added more details to the post. It would be really helpful if you can point me in the right direction. Thanks, Sen
    Anna Geller

    Anna Geller

    7 months ago
    @Sen I looked at your issue now and it looks like your agent uses a higher Prefect version than your Server. Your Server is deployed with a quite old Prefect version e.g. environments are deprecated by now. I'd recommend upgrading both Server and agents to more recent Prefect versions and this should fix many issues for you. A general rule of thumb you can use is that your Server must be on a Prefect version >= your agent's Prefect version (and the environment from which you register your flows if you use pickle storage)
    Sen

    Sen

    7 months ago
    hey @Anna Geller, you are right, my prefect server is at 0.13.16 and my agents and flow register machine is at 0.13.18.. use out of curiosity, how did you figure out this from the logs I send?
    On that note, is it safe to update the server to the latest version and then keep the flow at 0.13.18 as I see the imports paths are different in the new versions.
    Anna Geller

    Anna Geller

    7 months ago
    I would recommend upgrading both. I figured it out by seeing that your flow was using API endpoints which didn't work the same way in Server which indicates a version mismatch
    To explain the reason why I recommend upgrading both: we are working on releasing Prefect 1.0 and this version no longer has features such as environments. So it might be better for you to change that now rather than later