Ifeanyi Okwuchi
03/03/2022, 8:03 PMcreate_flow_run.map()
with wait_for_flow_run()
and get_task_run_result()
but I'm getting some errors. I'm not sure if I'm doing this correctly@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"hello {user_input}!. You're {age_input} today")
# This is the flow we can import
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
dummy_flow.storage = storage
dummy_flow.register(project_name=PROJECT, build=False)
# This is a flow we use to map the parameters to different runs of the imported flow
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow
with Flow("mapped_flows",
executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
)
wait_for_mapped_flows = wait_for_flow_run(
mapped_flows, raise_final_state=True, stream_logs=True
)
flow_result = get_task_run_result(
mapped_flows, "hello_world-1", upstream_tasks=[wait_for_mapped_flows]
)
Task 'wait_for_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run
File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 576, in from_flow_run_id
flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 618, in _query_for_flow_run
result = client.graphql(flow_run_query)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 570, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
Anna Geller
03/03/2022, 8:09 PMIfeanyi Okwuchi
03/03/2022, 8:28 PMcreate_flow_run.map()
. I'm not sure it generates the same output as create_flow_run
Anna Geller
03/03/2022, 8:55 PMIfeanyi Okwuchi
03/03/2022, 9:51 PMAnna Geller
03/03/2022, 10:05 PMget_task_run_results
, you need to configure the result on your relevant task decorator, here the hello_world
task
⢠you need to explicitly provide the Result location
because this is what the get_task_run_result
is using to retrieve the task run results
⢠another issue here is with the flow idempotency key - we need to add that if you map over the same flow because the flow run ID of the child flow run is by default also the task run ID for the create_flow_run
task causing the issue with parsing UUID you saw
⢠there are no results configured on the task which is why you cannot store and get task run results
⢠then, there is the issue that create_flow_run is a mapped task while your wait_for_flow_run
and get_task_run_result
are not - again, the flow in the Gist below fixes that
I'm sorry that I originally just superficially looked at the flow and considered it looked fine - by looking more closely there were indeed quite many issues lurking around š
Here is the flow that should work:
https://gist.github.com/5ab8c8e0d9cd65bc197bdb37a510f11e
LMK if you have any questions about itIfeanyi Okwuchi
03/03/2022, 10:55 PMTask 'get_task_run_result[1]': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 81, in get_result
self._result = self._load_result()
File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 90, in _load_result
self.state.load_result()
File "/usr/local/lib/python3.7/site-packages/prefect/engine/state.py", line 153, in load_result
self._result = result_reader.read(known_location) # type: ignore
File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/local_result.py", line 83, in read
with open(os.path.join(self.dir, location), "rb") as f:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/ifeanyi/.prefect/results/f2e547b0-30e2-4d1d-9281-924e0aeb07af'
LocalResult()
doesn't work on the cloudAnna Geller
03/03/2022, 11:46 PMIfeanyi Okwuchi
03/03/2022, 11:56 PMAnna Geller
03/04/2022, 12:20 AMIfeanyi Okwuchi
03/04/2022, 12:28 AMAnna Geller
03/04/2022, 11:31 AM<https://cloud.prefect.io/team-name-vertex/agent>
but really it doesn't matter, you can go to the Agents tab in your UI