Matthew
01/22/2025, 4:11 PM<http://flow.to|flow.to>_deployment(name, workpool)
. This leads to the first problem:
I have to manually change the entrypoint to replace the relative filename of the python file containing the flow with an absolute path, as the worker appears to execute the flow in a temporary directory. The code is:
runner_deployment: RunnerDeployment = await flow.to_deployment(
name=f"{flow.name}-deployment",
work_pool_name="workpool",
)
runner_deployment.entrypoint = resolve_entrypoint(runner_deployment.entrypoint)
...
def resolve_entrypoint(entrypoint: str) -> str:
# PROBLEM:
# The entrypoint of the flow is relative to the project root, and the local
# process worker cannot find it
# SOLUTION:
# Update the entrypoint to have an absolute path
relative_file, name = entrypoint.split(":")
absolute_file = Path(relative_file).resolve()
return f"{absolute_file}:{name}"
How can I get the code to resolve correctly for a process
type worker? Is this some sort of local storage thing? I would like to be able to iterate quickly so packaging and publishing docker images is undesirable.
Then I apply
the runner_deployment to get the deployment_id, and I can read the deployment details using the prefect client. I can then run this deployment with prefect.deployments.run_deployment(name=deployment_id, timeout=0, parameters=...)
. This then returns a FlowRun object which has an id
which I can use with the /api/flow_runs/{id}
api endpoint to read details about the flow, or the /api/flow_runs/{id}/resume
endpoint to resume it.
I would like to know how I can pass messages from the flow to the initiating process which may be on another machine etc etc. I can see in the prefect server that the flow run has associated artifacts but I cannot see a way to find the artifacts associated with a flow. I am also unsure if the artifacts are ordered, I would need to be able to see the messages that have been produced since some prior point. Further to this the return value of the ControlFlow flow does not appear to be an artifact - in the example code this is usually the result of the computation, so how can I access it?
My aim here is to be able to replace the cli_input method of getting human feedback with a separate tool which suspends the current flow and provides a message to prompt the user with (potentially a rich message with images etc). Then when the user has responded the flow can be resumed with their input. Is this an appropriate use of suspend / resume functionality? Do I have to handle the inter process communication in some separate way from the prefect server?
The final part would be the whole suspend resume stuff. I would like to use this to provide the list of previous conversations. Is this a suitable use of it? Doing suspend and resume involves rerunning every stage of the flow until the end, and caching is used to both ensure that the results are the same as the prior run and skip the various costs involved in rerunning everything. One thing I like about the ControlFlow LLM invocations is the interactive ones can ask further questions to the user to clarify the answer, and it is these kinds of situations where the suspend / resume would be helpful. Is there a way to partition the flows as a conversation could potentially be infinitely long? Are there constraints around the function caching that I should be aware of? Is there a way to retrieve the state of a flow including the cached task calls so that I can store and restore the cache if needed?
Thanks again for such a great toolNate
01/22/2025, 4:19 PMMatthew
01/22/2025, 4:21 PM"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")},
Omar
02/07/2025, 9:42 AMMatthew
02/10/2025, 12:35 PM