Richard Sundqvist
03/14/2024, 9:03 AMMarvin
03/14/2024, 9:03 AMflow_run_name
based on the run name of the parent flow, there isn't explicit documentation on how to directly inherit the name. However, you can set the flow_run_name
when invoking a flow, and this name can be programmatically determined based on the context of the parent flow.
For example:
python
@flow
def child_flow():
pass
@flow
def parent_flow():
# Suppose we get the name of the parent flow run from some logic or context
parent_flow_run_name = "parent-flow-run"
# Now we use this name to set the flow run name for the child
child_flow_run_name = f"{parent_flow_run_name}-child"
child_flow(flow_run_name=child_flow_run_name)
parent_flow()
In this pseudo-code, parent_flow_run_name
represents the name you derive from the parent flow's context. You then format or manipulate this variable to make a meaningful name for the child flow.
Keep in mind that the flow_run_name
must be either a string or a callable that returns a string. It cannot be any other type. This constraint is shown in the test from the GitHub source code:
python
@flow
def my_flow():
pass
my_flow.flow_run_name = InvalidFlowRunNameArg()
with pytest.raises(
TypeError,
match=(
"Expected string or callable for 'flow_run_name'; got"
" InvalidFlowRunNameArg instead."
),
):
my_flow()
For the general concept of composing flows and subflows, as described in the Prefect documentation, while a subflow is created when a flow function is called inside the execution of another flow, it does not directly mention inheriting the flow run name. You would typically manage the flow run names within your Python code logic when orchestrating parent-child flows.
If you have any more specific scenarios or questions, you can formulate them, and I'd look into any relevant documentation or practices.Richard Sundqvist
03/14/2024, 9:08 AMMarvin
03/14/2024, 9:08 AMprefect.context.get_run_context()
, which provides you with the flow run context including details like the flow run name.
When setting the flow_run_name
parameter for a flow in Prefect 2.x, you indeed need to use a callable that defines the flow run name dynamically. Here is an example of how you could define a callable function to set the flow_run_name
based on the parent flow's context:
python
from prefect import flow, context
def get_parent_flow_run_name():
run_context = context.get_run_context()
return run_context.flow_run_name + "-child" if run_context else "default-child-name"
@flow(flow_run_name=get_parent_flow_run_name)
def child_flow():
# Flow definition goes here
pass
@flow
def parent_flow():
# Parent flow logic
child_flow() # This will set the child flow's run name based on the parent flow's run name
parent_flow()
In the example above, the get_parent_flow_run_name
function retrieves the flow run context using prefect.context.get_run_context()
. This context will contain the parent flow's run name if the function is called during a flow run. However, if it's not called within a flow run, this function returns a default name for the child flow run.
You can find more information on accessing the run context in the Prefect documentation for the runtime context. If this approach does not meet your requirements or you run into other issues, please let me know.Richard Sundqvist
03/14/2024, 9:13 AMMarvin
03/14/2024, 9:14 AMprefect.context.get_run_context()
retrieves the context for the current flow, not the parent. To obtain the run name of the parent flow within a child flow, you can use the context.flow_run.parent_task_run_id
to reference the parent task run ID, and then make an asynchronous API call to PrefectClient.read_flow_run
to retrieve the parent flow run's details.
In the following pseudo-code, I will demonstrate how you would use PrefectClient.read_flow_run
to get the name of the parent flow run asynchronously within the child flow:
python
from prefect import flow, context
from prefect.client import get_client
async def get_parent_flow_run_name():
parent_task_run_id = context.get().flow_run.parent_task_run_id
async with get_client() as client:
parent_task_run = await client.read_task_run(parent_task_run_id)
parent_flow_run_id = parent_task_run.flow_run_id
parent_flow_run = await client.read_flow_run(parent_flow_run_id)
return parent_flow_run.name
@flow
def child_flow():
parent_flow_run_name = await get_parent_flow_run_name()
# Now, you can use the parent_flow_run_name for whatever you need in the child flow
pass
@flow
def parent_flow():
# Parent flow logic
child_flow() # Here the child flow will retrieve the parent flow run name asynchronously
parent_flow()
This code constructs a new flow run name for the child flow by asynchronously calling PrefectClient.read_task_run
and PrefectClient.read_flow_run
. These functions are used to retrieve the parent flow's run name from its ID, which is accessed via the parent_task_run_id
from the context object.
From the excerpts found in the 'test_flows.py' GitHub source code, we can see that parent_task_run_id
along with prefect_client.read_flow_run
and prefect_client.read_task_run
are being used to navigate relationships between parent and child entities in the Prefect runtime.
If you need to use the flow run name to generate a name for a child flow run or for another use, you would make the get_parent_flow_run_name
function return the desired string based on the parent flow run name.
Remember that this code is running asynchronously, so you need to ensure that it's called from an asynchronous context, or to adapt it to a synchronous context by using an event loop to run the coroutine function.Richard Sundqvist
03/14/2024, 9:16 AMMarvin
03/14/2024, 9:17 AM@flow
decorator in Prefect must be a synchronous callable and cannot be an asynchronous function since the engine does not await the call to get_parent_flow_run_name
.
Since the Prefect engine expects a synchronous callable, you will need to run the asynchronous code to interface with the Prefect API within a synchronous wrapper. To ensure the asynchronous code is executed synchronously, you could use the run_sync
utility from prefect.utilities.asyncio
.
Here's how you could implement this:
python
from prefect import flow, context
from prefect.client import get_client
from prefect.utilities.asyncio import run_sync
def get_parent_flow_run_id():
return context.get().flow_run.parent_task_run_id
async def async_get_parent_flow_run_name(parent_task_run_id):
async with get_client() as client:
parent_task_run = await client.read_task_run(parent_task_run_id)
parent_flow_run_id = parent_task_run.flow_run_id
parent_flow_run = await client.read_flow_run(parent_flow_run_id)
return parent_flow_run.name
def get_parent_flow_run_name():
parent_task_run_id = get_parent_flow_run_id()
# Use run_sync to synchronously run the async function
return run_sync(async_get_parent_flow_run_name)(parent_task_run_id)
@flow(flow_run_name=get_parent_flow_run_name)
def child_flow():
# Child flow contents here
pass
@flow
def parent_flow():
# Parent flow contents here
child_flow()
parent_flow()
In the snippet above, get_parent_flow_run_id
is a synchronous function that retrieves the parent_task_run_id
from the current Prefect context. This ID is then passed to the async_get_parent_flow_run_name
function, which fetches the parent flow run name asynchronously through the Prefect Client. The run_sync
utility is then used to execute the asynchronous function synchronously.
Please note that the actual implementation might require further adjustments depending on context setup, exception handling, and possible changes in the Prefect API, but the principle of wrapping the asynchronous call within a synchronous context should apply.Richard Sundqvist
03/14/2024, 9:18 AMNate
03/14/2024, 1:52 PMMarvin
03/14/2024, 1:53 PMget_run_context()
in the parent flow is simpler.
Let me clarify with a concise example on how to apply this:
python
from prefect import flow
from prefect.context import get_run_context
@flow
def child_flow(flow_run_name):
# Child flow logic goes here
pass
@flow
def parent_flow():
run_context = get_run_context()
flow_run_name = run_context.flow_run_name
subflow_run_name = f"based-on-{flow_run_name}"
# Call the subflow with a modified flow run name
child_flow(flow_run_name=subflow_run_name)
parent_flow()
In the parent flow (parent_flow
), you can call get_run_context()
to obtain the current run context, which includes the parent's flow run name. Then you can construct the new flow run name (subflow_run_name
) based on the parent's run name. You can then call the child flow (child_flow
) with this custom flow run name by just passing the constructed name as an argument.
This method directly uses the name you capture in the parent flow to affect the child flow without relying on asynchronous calls or the use of context inheritance beyond the explicit parameter passing.
If you have any further questions or another angle you'd like to explore, please let me know.Nate
03/14/2024, 1:54 PMMarvin
03/14/2024, 1:55 PMget_run_context()
to access the parent's FlowRunContext
.
2. Obtain the flow run name via the name
attribute from the context.
3. Create an f-string to derive the subflow's run name.
4. Use the with_options
method to set the flow_run_name
for the subflow.
5. Call the new modified subflow.
Here is a code example implementing your description:
python
from prefect import flow
from prefect.context import get_run_context
@flow
def child_flow():
# Child flow logic goes here
pass
@flow
def parent_flow():
# Retrieve the parent flow's run context
parent_flow_context = get_run_context()
# Get the parent flow's run name
parent_flow_run_name = parent_flow_context.name
# Form the subflow's run name based on the parent's name
subflow_run_name = f"based-on-{parent_flow_run_name}"
# Call the child subflow with the custom flow run name
modified_subflow = child_flow.with_options(name=subflow_run_name)
modified_subflow()
parent_flow()
In the above code:
- The get_run_context()
function is called within the parent_flow
to obtain the current FlowRunContext
.
- We access the name
property of this context to get the run name of the parent flow.
- We then construct the subflow run name using an f-string.
- The child_flow
subflow is modified using the with_options
method to set the custom run name.
- Finally, the modified subflow is called.
This should meet your requirements. If there is any other specific detail you need addressed, feel free to point it out.Nate
03/14/2024, 1:56 PMRichard Sundqvist
03/14/2024, 2:05 PMwith_options
indeed solves my issue, though it would be more convenient to do this automatically in the flow_run_name
function of the child and auto-detecting that we're starting a child-flow. That seemed to be a much harder nut to crack though, so this will do.
Thanks a lot!