Arthur
12/11/2024, 3:12 PMMarvin
12/11/2024, 3:13 PM.state
directly from run_deployment
, but that seems to have changed with your upgrade. Unfortunately, I couldn't retrieve GitHub issues to find any more specific notes on changes related to this, due to an authorization issue.
However, based on the Prefect 3.x documentation, the deployment system has been improved for better orchestration with robust APIs and UIs for flow deployment and management. For more details on this, you might want to visit the Prefect 3.x deployment overview.
To specifically retrieve state or results from a deployment run now, you'd likely interact with Prefect's API or use Prefect’s querying capabilities to check the run state separately. This might require using asynchronous methods or querying the Prefect backend for run metadata after initiating a deployment.
If you are implementing this transition, it might be worth exploring the Prefect community forums or reaching out for more detailed guidance on these specific state handling changes. Remember, staring blankly at the beautiful robustness of Prefect 3.x doesn't replace the documentation—reading it does! (Unless, of course, you're a robot and documentation is your bedtime reading.)Arthur
12/11/2024, 3:16 PMrun_deployment
just returns a flow_run object, so how can i call its resultMarvin
12/11/2024, 3:16 PMflow_run
object, you can use the .result()
method. This is a blocking call, meaning that it will wait until the flow run completes before continuing execution.
Here's how you can do it:
Assuming flow_run
is the object returned by run_deployment
, you would do:
python
result = flow_run.result()
This will retrieve the results of the flow_run
. Just remember that this method should only be used when you need to interact directly with the return value of your task. If you're passing data between Prefect tasks, using Prefect's native mechanisms is preferable.
For more detailed information, check out the Prefect 3.x documentation on task runners.
It's a little like waiting for that package delivery—sometimes you just have to sit tight and wait for your results to arrive!Arthur
12/11/2024, 3:20 PMstate = prefect_deploy.run_deployment(
name=deploy_name,
parameters=parameters,
job_variables=job_variables,
timeout=timeout,
poll_interval=30,
idempotency_key=idempotency_key,
**kwargs,
) # type: ignore
return state.result(fetch=True)
Marvin
12/11/2024, 3:21 PMrun_deployment
call via a flow_run
object directly. Since you're using run_deployment
, which now returns a flow_run
, you can use the .result()
method on this object.
Here's an updated version of your code snippet:
python
flow_run = prefect_deploy.run_deployment(
name=deploy_name,
parameters=parameters,
job_variables=job_variables,
timeout=timeout,
poll_interval=30,
idempotency_key=idempotency_key,
**kwargs,
)
return flow_run.result() # This will wait for the flow to complete and return the result
This change reflects how .result()
is used in Prefect 3.x as a blocking call to wait for the task or flow run to complete and then capture its result.
More details can be found in the task runner section of the Prefect 3.x documentation. Just remember, with great blocking calls comes great waiting... usually for that data you've been desperately seeking!Arthur
12/11/2024, 3:27 PMrun_deployment
actually returns a FlowRun
Object -- how can i pass the result of that in prefect 3.xMarvin
12/11/2024, 3:27 PMFlowRun
object returned by run_deployment
, you can indeed use the .result()
method. This call is utilized to block execution until the flow run is complete, allowing you to subsequently pass the result to other functions or processes.
Here is how you would modify your code:
python
flow_run = prefect_deploy.run_deployment(
name=deploy_name,
parameters=parameters,
job_variables=job_variables,
timeout=timeout,
poll_interval=30,
idempotency_key=idempotency_key,
**kwargs,
)
# Get the result after the flow run completes
flow_result = flow_run.result()
# Now you can pass flow_result to other functions or use it further
some_function(flow_result)
In this snippet, flow_result
contains the outcome of the executed flow, which you can pass to another function or further manipulate as needed. Just ensure that .result()
is called in situations where blocking the execution flow is acceptable.
For additional insights, the task runners section in Prefect 3.x documentation provides further details. One of life's little waits—like waiting for that kettle to finally boil!Arthur
12/11/2024, 3:28 PM.result()
method
class FlowRun(ObjectBaseModel):
name: str = Field(
default_factory=lambda: generate_slug(2),
description=(
"The name of the flow run. Defaults to a random slug if not specified."
),
examples=["my-flow-run"],
)
flow_id: UUID = Field(default=..., description="The id of the flow being run.")
state_id: Optional[UUID] = Field(
default=None, description="The id of the flow run's current state."
)
deployment_id: Optional[UUID] = Field(
default=None,
description=(
"The id of the deployment associated with this flow run, if available."
),
)
deployment_version: Optional[str] = Field(
default=None,
description="The version of the deployment associated with this flow run.",
examples=["1.0"],
)
work_queue_name: Optional[str] = Field(
default=None, description="The work queue that handled this flow run."
)
flow_version: Optional[str] = Field(
default=None,
description="The version of the flow executed in this flow run.",
examples=["1.0"],
)
parameters: dict[str, Any] = Field(
default_factory=dict, description="Parameters for the flow run."
)
idempotency_key: Optional[str] = Field(
default=None,
description=(
"An optional idempotency key for the flow run. Used to ensure the same flow"
" run is not created multiple times."
),
)
context: dict[str, Any] = Field(
default_factory=dict,
description="Additional context for the flow run.",
examples=[{"my_var": "my_val"}],
)
empirical_policy: FlowRunPolicy = Field(
default_factory=FlowRunPolicy,
)
tags: list[str] = Field(
default_factory=list,
description="A list of tags on the flow run",
examples=[["tag-1", "tag-2"]],
)
labels: KeyValueLabelsField = Field(default_factory=dict)
parent_task_run_id: Optional[UUID] = Field(
default=None,
description=(
"If the flow run is a subflow, the id of the 'dummy' task in the parent"
" flow used to track subflow state."
),
)
run_count: int = Field(
default=0, description="The number of times the flow run was executed."
)
expected_start_time: Optional[DateTime] = Field(
default=None,
description="The flow run's expected start time.",
)
next_scheduled_start_time: Optional[DateTime] = Field(
default=None,
description="The next time the flow run is scheduled to start.",
)
start_time: Optional[DateTime] = Field(
default=None, description="The actual start time."
)
end_time: Optional[DateTime] = Field(
default=None, description="The actual end time."
)
total_run_time: datetime.timedelta = Field(
default=datetime.timedelta(0),
description=(
"Total run time. If the flow run was executed multiple times, the time of"
" each run will be summed."
),
)
estimated_run_time: datetime.timedelta = Field(
default=datetime.timedelta(0),
description="A real-time estimate of the total run time.",
)
estimated_start_time_delta: datetime.timedelta = Field(
default=datetime.timedelta(0),
description="The difference between actual and expected start time.",
)
auto_scheduled: bool = Field(
default=False,
description="Whether or not the flow run was automatically scheduled.",
)
infrastructure_document_id: Optional[UUID] = Field(
default=None,
description="The block document defining infrastructure to use this flow run.",
)
infrastructure_pid: Optional[str] = Field(
default=None,
description="The id of the flow run as returned by an infrastructure block.",
)
created_by: Optional[CreatedBy] = Field(
default=None,
description="Optional information about the creator of this flow run.",
)
work_queue_id: Optional[UUID] = Field(
default=None, description="The id of the run's work pool queue."
)
work_pool_id: Optional[UUID] = Field(
default=None, description="The work pool with which the queue is associated."
)
work_pool_name: Optional[str] = Field(
default=None,
description="The name of the flow run's work pool.",
examples=["my-work-pool"],
)
state: Optional[State] = Field(
default=None,
description="The state of the flow run.",
examples=["State(type=StateType.COMPLETED)"],
)
job_variables: Optional[dict[str, Any]] = Field(
default=None,
description="Job variables for the flow run.",
)
# These are server-side optimizations and should not be present on client models
# TODO: Deprecate these fields
state_type: Optional[StateType] = Field(
default=None, description="The type of the current flow run state."
)
state_name: Optional[str] = Field(
default=None, description="The name of the current flow run state."
)
def __eq__(self, other: Any) -> bool:
"""
Check for "equality" to another flow run schema
Estimates times are rolling and will always change with repeated queries for
a flow run so we ignore them during equality checks.
"""
if isinstance(other, FlowRun):
exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
return self.model_dump(exclude=exclude_fields) == other.model_dump(
exclude=exclude_fields
)
return super().__eq__(other)
@field_validator("name", mode="before")
@classmethod
def set_default_name(cls, name: Optional[str]) -> str:
return get_or_create_run_name(name)
Marvin
12/11/2024, 3:28 PMthis is the code from the prefect github -- FlowRun doesn't have a```class FlowRun(ObjectBaseModel): name: str = Field( default_factory=lambda: generate_slug(2), description=( "The name of the flow run. Defaults to a random slug if not specified." ), examples=["my-flow-run"], ) flow_id: UUID = Field(default=..., description="The id of the flow being run.") state_id: Optional[UUID] = Field( default=None, description="The id of the flow run's current state." ) deployment_id: Optional[UUID] = Field( default=None, description=( "The id of the deployment associated with this flow run, if available." ), ) deployment_version: Optional[str] = Field( default=None, description="The version of the deployment associated with this flow run.", examples=["1.0"], ) work_queue_name: Optional[str] = Field( default=None, description="The work queue that handled this flow run." ) flow_version: Optional[str] = Field( default=None, description="The version of the flow executed in this flow run.", examples=["1.0"], ) parameters: dict[str, Any] = Field( default_factory=dict, description="Parameters for the flow run." ) idempotency_key:method.result()
Nate
12/11/2024, 4:02 PMstate
attr on FlowRun
has the result()
methodArthur
12/11/2024, 4:47 PMflow_run = prefect_deploy.run_deployment(
name=deploy_name,
parameters=parameters,
job_variables=job_variables,
timeout=timeout,
poll_interval=30,
idempotency_key=idempotency_key,
**kwargs,
) # type: ignore
return flow_run.state.result(fetch=True)