Hello all. I tried using `create_flow_run.map()` w...
# prefect-community
Hello all. I tried using
but I'm getting some errors. I'm not sure if I'm doing this correctly
Here's my code
Copy code
def hello_world(user_input: str, age_input: int):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"hello {user_input}!. You're {age_input} today")

# This is the flow we can import
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow:
    user_param = Parameter("user_input", default="world")
    age_param = Parameter("age_input", default=18)
    hw = hello_world(user_param, age_param)

dummy_flow.storage = storage
dummy_flow.register(project_name=PROJECT, build=False)

# This is a flow we use to map the parameters to different runs of the imported flow
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow
with Flow("mapped_flows", 
          executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect", age_input=21),
                  dict(user_input="Marvin", age_input=27),
                  dict(user_input="World", age_input=12),
    mapped_flows = create_flow_run.map(
    wait_for_mapped_flows = wait_for_flow_run(
        mapped_flows, raise_final_state=True, stream_logs=True
    flow_result = get_task_run_result(
        mapped_flows, "hello_world-1", upstream_tasks=[wait_for_mapped_flows]
Here's the error
Copy code
Task 'wait_for_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 576, in from_flow_run_id
    flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 618, in _query_for_flow_run
    result = client.graphql(flow_run_query)
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 570, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
Can you try reregistering your flow? This often helps with such "parsing UUID" errors
your flow seems fine
@Anna Geller the flow is already registered. I'm guessing the problem may be with
. I'm not sure it generates the same output as
Any ideas?
yes, I do have some, writing up my answer šŸ™‚
Okay. I'll be waiting
Upon closer and more thorough investigation, there are actually multiple issues with this flow: ā€¢ you are trying to get task run results even though your task doesn't return anything, so there are no results šŸ™‚ - the modified flow in the gist now returns the same string you were only logging so far ā€¢ in order to use results, and then to
, you need to configure the result on your relevant task decorator, here the
task ā€¢ you need to explicitly provide the
Result location
because this is what the
is using to retrieve the task run results ā€¢ another issue here is with the flow idempotency key - we need to add that if you map over the same flow because the flow run ID of the child flow run is by default also the task run ID for the
task causing the issue with parsing UUID you saw ā€¢ there are no results configured on the task which is why you cannot store and get task run results ā€¢ then, there is the issue that create_flow_run is a mapped task while your
are not - again, the flow in the Gist below fixes that I'm sorry that I originally just superficially looked at the flow and considered it looked fine - by looking more closely there were indeed quite many issues lurking around šŸ˜„ Here is the flow that should work: https://gist.github.com/5ab8c8e0d9cd65bc197bdb37a510f11e LMK if you have any questions about it
Oh wow! Thank you very much for your detailed analysis
šŸ‘ 1
So I tried to implement this but I get this error in prefect cloud
Copy code
Task 'get_task_run_result[1]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 81, in get_result
    self._result = self._load_result()
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 90, in _load_result
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/state.py", line 153, in load_result
    self._result = result_reader.read(known_location)  # type: ignore
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/local_result.py", line 83, in read
    with open(os.path.join(self.dir, location), "rb") as f:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/ifeanyi/.prefect/results/f2e547b0-30e2-4d1d-9281-924e0aeb07af'
Looks like
doesn't work on the cloud
I updated the Gist - can you try that version? https://gist.github.com/0c922ac90e43a0581b9ec43079cdb5c2 but both should work if you start a local agent. What agent type do you use? the local result only works with local agent
Working with prefect cloud. Where do I confirm what agent it is?
in your cloud UI for example? šŸ™‚ https://cloud.prefect.io/yourteamname/agent or maybe ask someone who deployed your agent in your team? also you may check https://docs.prefect.io/orchestration/getting-started/quick-start.html
I can see that link but it doesn't specify the agent type
or I don't see the agent type
it does say the name is "team-name-vertex"
In that case the cloud URL to the agent would be:
Copy code
but really it doesn't matter, you can go to the Agents tab in your UI