https://prefect.io logo
#prefect-server
Title
# prefect-server
s

Sen

02/22/2022, 2:07 PM
Hello everyone! This is Sen from SouthAfrica. I have been struggling with a problem for sometime now. Googled a bit and can't get any directions to solve this issue. So I am running my own prefect server in an Azure VM and I am running an agent in my local machine. The Prefect version I am running is 0.13.18 everywhere. The prefect agent was started in my machine using the below command :
Copy code
prefect agent local start -l 'On_Prem_MapTest' -a "<http://MY-SERVER-IP:4200>"
Please find the code for the flow below:
Copy code
# Basic Imports
import psutil
import os
import requests

# Extracting the Prefect Server URL
os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY-SERVER-IP:4200/graphql>"

import prefect
from prefect import Flow, Parameter, task
from prefect.core.task import Task
from prefect.engine import signals
from prefect.engine.state import State
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker, Azure
from prefect.engine.executors import LocalDaskExecutor, DaskExecutor


@task
def create_url_list():
    """
    Given the main page html, creates a list of episode URLs
    """

    url_ids = [21.10, 21.04, 20.10, 20.04, 19.10, 19.04, 18.10, 18.04, 17.10, 17.04, 16.10, 16.04, 
               15.10, 15.04, 14.10, 14.04, 13.10, 13.04, 12.10, 12.04, 11.10, 11.04, 10.10, 10.04, 09.10, 09.04, 
               08.10, 08.04, 07.10, 07.04, 06.10, 06.06, 05.10, 05.04, 04.10, 06.10, 06.06, 05.10, 05.04, 04.10]
    
    urls = []
    for url_id in url_ids:
        urls.append('<http://old-releases.ubuntu.com/releases/>' + str(url_id))

    return urls


@task
def retrieve_url(url):
    print(url)
    html = requests.get(url)
    if html.ok:
        return str(len(html.content))
    else:
        return None


def main():
    """Main Function"""
    
    with Flow(
        "On_Prem_MapTest",
    ) as flow:

        get_urls = create_url_list()
        url_results = retrieve_url.map(get_urls)

    # Registering the Flow as a Docker
    flow.environment = LocalEnvironment()
    # flow.storage = Local()
    
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
    flow.register("SampleFlows", labels=["On_Prem_MapTest"])
    
    # The code below when uncommented works fine 
    # final_state = flow.run()
    # final_state_2 = final_state.result[url_results]
    # print('\n'.join([f'{s.result}: {s}' for s in final_state_2.map_states[:5]]))

if __name__ == "__main__":
    main()
The problem is when I try to run this using flow from the agent by registering it and then using the PrefectUI to run the task, then it fails with the below message:
Copy code
Failed to retrieve task state with error: ClientError('400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n    GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on\n        type "Mutation". Did you mean "get_or_create_task_run" or\n        "get_or_create_mapped_task_run_children"?\n\nThe GraphQL query was:\n\n    mutation {\n            get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {\n                id\n                version\n                serialized_state\n        }\n    }\n\nThe passed variables were:\n\n    null\n')
Traceback (most recent call last):
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 360, in _send_request
    response.raise_for_status()
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
    map_index=map_index,
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 1331, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 303, in graphql
    retry_on_api_error=retry_on_api_error,
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 219, in post
    retry_on_api_error=retry_on_api_error,
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 445, in _request
    session=session, method=method, url=url, params=params, headers=headers
  File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _send_request
    raise ClientError(f"{exc}\n{graphql_msg}") from exc
prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 

The following error messages were provided by the GraphQL server:

    GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on
        type "Mutation". Did you mean "get_or_create_task_run" or
        "get_or_create_mapped_task_run_children"?

The GraphQL query was:

    mutation {
            get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {
                id
                version
                serialized_state
        }
    }

The passed variables were:

    null
But this works when run directly from the machine using the flow.run() command as commented in the below code without using the agent. I am not sure why it happens like this. It will be great if someone can point me in the right direction on how to solve this issue. I am also not sure on how to finally iterate through the results of the method url_results, once all mapped tasks are done. Thanks in advance,
👋 3
1
discourse 1
👀 1
a

Anna Geller

02/22/2022, 2:16 PM
Hi and welcome to the community! Can you explain the problem a bit more? 🙂
highfive 1
s

Stéphan Taljaard

02/22/2022, 2:32 PM
Hi Navaneeth. Great seeing another South African here 🇿🇦
highfive 1
s

Sen

02/22/2022, 2:57 PM
oops.. I typed the whole thing and then when I clicked send, it just disappeared.. 😱
ok.. it is updated now..😥
Just to give you an overview of the task I am trying to accomplish, I am trying to figure out how to use "map" and "LocalDaskExecutor" in a Prefect Docker Agent setup to parallelise on of my slow running tasks.
@Anna Geller I have added more details to the post. It would be really helpful if you can point me in the right direction. Thanks, Sen
a

Anna Geller

02/22/2022, 6:19 PM
@Sen I looked at your issue now and it looks like your agent uses a higher Prefect version than your Server. Your Server is deployed with a quite old Prefect version e.g. environments are deprecated by now. I'd recommend upgrading both Server and agents to more recent Prefect versions and this should fix many issues for you. A general rule of thumb you can use is that your Server must be on a Prefect version >= your agent's Prefect version (and the environment from which you register your flows if you use pickle storage)
s

Sen

02/23/2022, 6:35 AM
hey @Anna Geller, you are right, my prefect server is at 0.13.16 and my agents and flow register machine is at 0.13.18.. use out of curiosity, how did you figure out this from the logs I send?
On that note, is it safe to update the server to the latest version and then keep the flow at 0.13.18 as I see the imports paths are different in the new versions.
a

Anna Geller

02/23/2022, 9:23 AM
I would recommend upgrading both. I figured it out by seeing that your flow was using API endpoints which didn't work the same way in Server which indicates a version mismatch
👏 1
To explain the reason why I recommend upgrading both: we are working on releasing Prefect 1.0 and this version no longer has features such as environments. So it might be better for you to change that now rather than later
1