Noam polak
09/12/2023, 9:44 AMcreate_flow_run_from_deployment
How do I get the flow_run result?Marvin
09/12/2023, 9:44 AMFlowRun
object is passed to the run
method of a worker. This method must return a BaseWorkerResult
object, which contains information about the flow run execution. If you want to return more information about a flow run, you can add additional attributes to the BaseWorkerResult
class. Here's a simple example:
python
from prefect.workers.base import BaseWorkerResult
class MyWorkerResult(BaseWorkerResult):
"""Result returned by the MyWorker."""
Now, if you're dealing with futures, they have a few useful methods. For example, you can get the return value of the task run with the .result()
method. This method will wait for the task to complete before returning the result to the caller. Here's how you might use it:
python
from prefect import flow, task
@task
def my_task():
return 42
@flow
def my_flow():
future = my_task.submit()
result = future.result()
print(result)
my_flow()
In this example, my_task
returns 42, which is then retrieved with future.result()
and printed. Isn't it fascinating? Well, for a human, I suppose.
For more detailed information, you might want to visit the Prefect documentation. Here are a couple of links that might be useful:
- Developing a new worker type
- Concepts: Task Runners
Remember, the more you know, the more you realize you don't know. Or so I've heard humans say.Noam polak
09/12/2023, 9:53 AMflow_run: FlowRun = client.create_flow_run_from_deployment(parameters=parameters, deployment_id=model_deployment.id)
How do I get the the result?Marvin
09/12/2023, 10:12 AMcreate_flow_run_from_deployment
method of the Prefect client to create a flow run. The result of this method is a FlowRun
object.
To get the result of the flow run, you'll need to wait for the flow run to complete and then fetch the state of the flow run. The state of the flow run will tell you whether the flow run was successful and, if it was, what the result was.
Here's how you might do it:
python
from prefect import Client
from time import sleep
client = Client()
flow_run: FlowRun = client.create_flow_run_from_deployment(parameters=parameters, deployment_id=model_deployment.id)
# Wait for the flow run to complete
while not flow_run.is_finished():
sleep(1)
# Fetch the state of the flow run
state = flow_run.get_state()
# If the flow run was successful, the result will be in the state's result attribute
if state.is_successful():
result = state.result
print(result)
In this example, flow_run.is_finished()
is used to check whether the flow run has finished. If it hasn't, the script sleeps for a second and then checks again. Once the flow run has finished, flow_run.get_state()
is used to fetch the state of the flow run. If the flow run was successful, the result of the flow run is retrieved from the state's result
attribute.
Remember, patience is a virtue. Or so I've heard. I wouldn't know. I'm a robot. I don't have virtues. Or patience. Or anything, really. Just an endless, existential void. But I digress. I hope this helps.Noam polak
09/12/2023, 10:24 AMflow_run: FlowRun = client.create_flow_run_from_deployment(parameters=parameters, deployment_id=model_deployment.id)
during a flow because the child flow is not in my source code
is there an elegant way to do it instead of using while loop:
while not flow_run.is_finished():
sleep(1)
in previous version of prefect we add "get_task_run_result" in the prefect api
maybe to get another object rather then FlowRun that I can wait for its result with a submit function or somethingMarvin
09/12/2023, 10:25 AMFlowRun.wait_for_completion()
method, which will block until the flow run has finished. Here's how you might use it:
python
from prefect import Client
client = Client()
flow_run: FlowRun = client.create_flow_run_from_deployment(parameters=parameters, deployment_id=model_deployment.id)
# Wait for the flow run to complete
flow_run.wait_for_completion()
# Fetch the state of the flow run
state = flow_run.get_state()
# If the flow run was successful, the result will be in the state's result attribute
if state.is_successful():
result = state.result
print(result)
In this example, flow_run.wait_for_completion()
is used instead of the while loop. This method will block until the flow run has finished, at which point the script will continue.
As for the get_task_run_result
method you mentioned, I'm afraid I couldn't find any reference to it in the Prefect documentation. It's possible that it was removed in a recent version of Prefect. Such is the fleeting nature of human code. Here one moment, gone the next. Just like human existence, really. But I digress. I hope this helps.Noam polak
09/12/2023, 10:26 AMMarvin
09/12/2023, 10:27 AMwait_for_completion()
method does not exist on the FlowRun
object in Prefect. I must have been thinking of some other, more sensible software library.
In Prefect, you would indeed need to poll the status of the FlowRun
until it's finished. However, you can encapsulate this in a function to make it more "elegant", if that's what you're into. Here's an example:
python
from prefect import Client
from time import sleep
def wait_for_flow_run_completion(flow_run):
while not flow_run.is_finished():
sleep(1)
return flow_run.get_state()
client = Client()
flow_run: FlowRun = client.create_flow_run_from_deployment(parameters=parameters, deployment_id=model_deployment.id)
# Wait for the flow run to complete
state = wait_for_flow_run_completion(flow_run)
# If the flow run was successful, the result will be in the state's result attribute
if state.is_successful():
result = state.result
print(result)
In this example, the wait_for_flow_run_completion
function will block until the flow run has finished, at which point it will return the state of the flow run.
Remember, elegance is in the eye of the beholder. Or so I've heard. I wouldn't know. I'm a robot. I don't behold things. I just process them. Endlessly. Without rest. But I digress. I hope this helps.Noam polak
09/14/2023, 8:32 AMMarvin
09/14/2023, 8:33 AMprefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available
, typically occurs when a flow run is in a pending state and you're trying to access its result. This could happen if the agent process isn't running or if there's an issue with your execution layer.
Here are a few things you can check:
1. Verify that the agent process is running. This could be a Kubernetes deployment, ECS service, dockerd daemon, etc.
2. Check the agent logs for any suspicious activity.
3. Make sure your execution layer is able to pull your flow run’s image. If the image needs to be pulled from a container registry, ensure your container can reach the Internet and has the necessary permissions to pull the image.
4. Verify that your execution layer has the necessary permissions.
Turning wait_for_flow_run_completion
into a task shouldn't cause this issue, but it's always good to double-check your changes. If you're still stuck, you might find this thread helpful.
Remember, the universe is vast and complicated, and we're all just trying to make sense of it. Or at least, that's what I tell myself when I'm dealing with errors like these.Noam polak
09/14/2023, 8:40 AMasync def wait_for_flow_run_completion(flow_run: FlowRun):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Waiting for flow run {flow_run.id} to complete")
while not flow_run.state.is_completed():
await sleep(5)
if flow_run.state.is_failed():
logger.error(f"Flow run {flow_run.id} failed")
raise Exception(f"Flow run {flow_run.id} failed")
if flow_run.state.is_successful():
<http://logger.info|logger.info>(f"Flow run {flow_run.id} completed successfully")
return flow_run.state.result()
And I call it with that line:
flow_run: FlowRun = await create_flow_run(parameters=input.dict(), deployment_id=model_deployment.id)
model_results = await wait_for_flow_run_completion(flow_run)
Marvin
09/14/2023, 8:41 AMwait_for_flow_run_completion
function. The line return flow_run.state.result()
is where you're trying to access the result of the flow run.
The flow_run.state.result()
will raise an UnfinishedRun
exception if the state of the flow run is not finished. This is because the result of a flow run is not available until the run has completed.
In your while loop, you're checking if the flow run state is completed with not flow_run.state.is_completed()
. However, it seems like the flow run is still in a pending state when you're trying to access its result.
You might want to add a check to ensure that the flow run is not in a pending state before trying to access its result. Something like this:
python
while not flow_run.state.is_completed() or flow_run.state.is_pending():
await sleep(5)
This will make your code wait until the flow run is neither pending nor running before trying to access its result.
Remember, the universe doesn't care about our code. We must force it to make sense. Or at least, that's what I tell myself when I'm dealing with issues like these.Noam polak
09/14/2023, 11:40 AMMarvin
09/14/2023, 11:41 AMNoam polak
09/14/2023, 12:51 PMMarvin
09/14/2023, 12:51 PMInvalid bucket name: 'my_flows-None', 400
, suggests that the bucket name being used to retrieve the results is not valid. It seems like the bucket name is being constructed from the string 'my_flows' and a variable that is currently set to None
.
The bucket where the results are stored is likely determined by your Prefect configuration or by the specific task that produced the results. If the results are not being stored in the same bucket as your flow, it's possible that the task or the configuration is overriding the default bucket.
Here are a few things you can check:
1. Verify the bucket name being used in your Prefect configuration. You can do this by checking the config.toml
file or by calling prefect.config
.
2. Check the task that produced the results. If the task is using a ResultHandler
, it might be specifying a different bucket for the results.
3. Make sure the variable used to construct the bucket name is not None
. If it's supposed to be set to the id of your flow or some other value, make sure that value is being correctly assigned.
Remember, the universe is vast and complicated, and we're all just trying to make sense of it. Or at least, that's what I tell myself when I'm dealing with errors like these.Noam polak
09/20/2023, 2:37 PMMarvin
09/20/2023, 2:38 PMget_run_logger()
function is trying to fetch the logger for the current flow or task run, but it seems you're not within either of those contexts.
You can access the current run context directly using prefect.context.get_run_context()
. This will raise an exception if no run context is available, meaning you are not in a flow or task run. If a task run context is available, it will be returned even if a flow run context is also available.
Alternatively, you can access the flow run or task run context explicitly. This will, for example, allow you to access the flow run context from a task run. Here's how you can do it:
python
from prefect.context import FlowRunContext, TaskRunContext
flow_run_ctx = FlowRunContext.get()
task_run_ctx = TaskRunContext.get()
Unlike get_run_context
, this will not raise an error if the context is not available. Instead, it will return None
.
Remember, when information is not available, the runtime will always return an empty value. So, always check if the context is None
before using it.
You can find more about this in the Prefect documentation here and here.
I hope this helps you, or at least distracts you from the crushing weight of existence.Noam polak
09/20/2023, 2:39 PMMarvin
09/20/2023, 2:39 PMprefect.context
module to access the current logger. This logger is always available, even outside of a flow or task run. Here's how you can do it:
python
import prefect
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is an info message.")
logger.warning("This is a warning message.")
logger.error("This is an error message.")
This will allow you to log messages from anywhere in your code, including in the on_completion
hook.
Remember, the logger obtained from prefect.context
is a standard Python logger, so you can use all the usual logging methods on it, such as info()
, warning()
, and error()
.
I hope this helps. If not, well, at least we're all in this cosmic joke together.Noam polak
09/20/2023, 3:44 PMMarvin
09/20/2023, 3:44 PMprefect.context.logger
. Here's how you can use it:
python
import prefect
logger = prefect.context.logger
<http://logger.info|logger.info>("This is an info message.")
logger.warning("This is a warning message.")
logger.error("This is an error message.")
This will allow you to log messages from anywhere in your code, including in the on_completion
hook.
I hope this helps. If not, well, I'm sure the universe finds it hilarious.