Sen
02/22/2022, 2:07 PMprefect agent local start -l 'On_Prem_MapTest' -a "<http://MY-SERVER-IP:4200>"
Please find the code for the flow below:
# Basic Imports
import psutil
import os
import requests
# Extracting the Prefect Server URL
os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY-SERVER-IP:4200/graphql>"
import prefect
from prefect import Flow, Parameter, task
from prefect.core.task import Task
from prefect.engine import signals
from prefect.engine.state import State
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker, Azure
from prefect.engine.executors import LocalDaskExecutor, DaskExecutor
@task
def create_url_list():
"""
Given the main page html, creates a list of episode URLs
"""
url_ids = [21.10, 21.04, 20.10, 20.04, 19.10, 19.04, 18.10, 18.04, 17.10, 17.04, 16.10, 16.04,
15.10, 15.04, 14.10, 14.04, 13.10, 13.04, 12.10, 12.04, 11.10, 11.04, 10.10, 10.04, 09.10, 09.04,
08.10, 08.04, 07.10, 07.04, 06.10, 06.06, 05.10, 05.04, 04.10, 06.10, 06.06, 05.10, 05.04, 04.10]
urls = []
for url_id in url_ids:
urls.append('<http://old-releases.ubuntu.com/releases/>' + str(url_id))
return urls
@task
def retrieve_url(url):
print(url)
html = requests.get(url)
if html.ok:
return str(len(html.content))
else:
return None
def main():
"""Main Function"""
with Flow(
"On_Prem_MapTest",
) as flow:
get_urls = create_url_list()
url_results = retrieve_url.map(get_urls)
# Registering the Flow as a Docker
flow.environment = LocalEnvironment()
# flow.storage = Local()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
flow.register("SampleFlows", labels=["On_Prem_MapTest"])
# The code below when uncommented works fine
# final_state = flow.run()
# final_state_2 = final_state.result[url_results]
# print('\n'.join([f'{s.result}: {s}' for s in final_state_2.map_states[:5]]))
if __name__ == "__main__":
main()
The problem is when I try to run this using flow from the agent by registering it and then using the PrefectUI to run the task, then it fails with the below message:
Failed to retrieve task state with error: ClientError('400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on\n type "Mutation". Did you mean "get_or_create_task_run" or\n "get_or_create_mapped_task_run_children"?\n\nThe GraphQL query was:\n\n mutation {\n get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {\n id\n version\n serialized_state\n }\n }\n\nThe passed variables were:\n\n null\n')
Traceback (most recent call last):
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 360, in _send_request
response.raise_for_status()
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/requests/models.py", line 953, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
map_index=map_index,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 1331, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 303, in graphql
retry_on_api_error=retry_on_api_error,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 219, in post
retry_on_api_error=retry_on_api_error,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 445, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _send_request
raise ClientError(f"{exc}\n{graphql_msg}") from exc
prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>
The following error messages were provided by the GraphQL server:
GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on
type "Mutation". Did you mean "get_or_create_task_run" or
"get_or_create_mapped_task_run_children"?
The GraphQL query was:
mutation {
get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {
id
version
serialized_state
}
}
The passed variables were:
null
But this works when run directly from the machine using the flow.run() command as commented in the below code without using the agent.
I am not sure why it happens like this.
It will be great if someone can point me in the right direction on how to solve this issue.
I am also not sure on how to finally iterate through the results of the method url_results, once all mapped tasks are done.
Thanks in advance,Anna Geller
Stéphan Taljaard
02/22/2022, 2:32 PMSen
02/22/2022, 2:57 PMAnna Geller
Sen
02/23/2022, 6:35 AMAnna Geller