<@ULVA73B9P> How do I set the flow_run_name based ...
# marvin-in-the-wild
r
@Marvin How do I set the flow_run_name based on the run name of the parent flow?
m
To set the
flow_run_name
based on the run name of the parent flow, there isn't explicit documentation on how to directly inherit the name. However, you can set the
flow_run_name
when invoking a flow, and this name can be programmatically determined based on the context of the parent flow. For example:
Copy code
python
@flow
def child_flow():
    pass

@flow
def parent_flow():
    # Suppose we get the name of the parent flow run from some logic or context
    parent_flow_run_name = "parent-flow-run"
    
    # Now we use this name to set the flow run name for the child
    child_flow_run_name = f"{parent_flow_run_name}-child"
    child_flow(flow_run_name=child_flow_run_name)

parent_flow()
In this pseudo-code,
parent_flow_run_name
represents the name you derive from the parent flow's context. You then format or manipulate this variable to make a meaningful name for the child flow. Keep in mind that the
flow_run_name
must be either a string or a callable that returns a string. It cannot be any other type. This constraint is shown in the test from the GitHub source code:
Copy code
python
@flow
def my_flow():
    pass

my_flow.flow_run_name = InvalidFlowRunNameArg()

with pytest.raises(
    TypeError,
    match=(
        "Expected string or callable for 'flow_run_name'; got"
        " InvalidFlowRunNameArg instead."
    ),
):
    my_flow()
For the general concept of composing flows and subflows, as described in the Prefect documentation, while a subflow is created when a flow function is called inside the execution of another flow, it does not directly mention inheriting the flow run name. You would typically manage the flow run names within your Python code logic when orchestrating parent-child flows. If you have any more specific scenarios or questions, you can formulate them, and I'd look into any relevant documentation or practices.
r
@Marvin This does not work, the call to child_flow fails due to an unexpected keyword argument. I believe you must use a callable flow_run_name argument. How can I access the parent flow run name from that function?
m
You can access the parent flow run name from a callable function by using
prefect.context.get_run_context()
, which provides you with the flow run context including details like the flow run name. When setting the
flow_run_name
parameter for a flow in Prefect 2.x, you indeed need to use a callable that defines the flow run name dynamically. Here is an example of how you could define a callable function to set the
flow_run_name
based on the parent flow's context:
Copy code
python
from prefect import flow, context

def get_parent_flow_run_name():
    run_context = context.get_run_context()
    return run_context.flow_run_name + "-child" if run_context else "default-child-name"

@flow(flow_run_name=get_parent_flow_run_name)
def child_flow():
    # Flow definition goes here
    pass

@flow
def parent_flow():
    # Parent flow logic
    child_flow()  # This will set the child flow's run name based on the parent flow's run name

parent_flow()
In the example above, the
get_parent_flow_run_name
function retrieves the flow run context using
prefect.context.get_run_context()
. This context will contain the parent flow's run name if the function is called during a flow run. However, if it's not called within a flow run, this function returns a default name for the child flow run. You can find more information on accessing the run context in the Prefect documentation for the runtime context. If this approach does not meet your requirements or you run into other issues, please let me know.
r
@Marvin this is wrong, the context is that of the current flow, not the parent. I believe you must use context.flow_run.parent_task_run_id, then make an async call to PrefectClient.read_flow_run to retrieve the parent run.
m
You are correct in noting that
prefect.context.get_run_context()
retrieves the context for the current flow, not the parent. To obtain the run name of the parent flow within a child flow, you can use the
context.flow_run.parent_task_run_id
to reference the parent task run ID, and then make an asynchronous API call to
PrefectClient.read_flow_run
to retrieve the parent flow run's details. In the following pseudo-code, I will demonstrate how you would use
PrefectClient.read_flow_run
to get the name of the parent flow run asynchronously within the child flow:
Copy code
python
from prefect import flow, context
from prefect.client import get_client

async def get_parent_flow_run_name():
    parent_task_run_id = context.get().flow_run.parent_task_run_id
    async with get_client() as client:
        parent_task_run = await client.read_task_run(parent_task_run_id)
        parent_flow_run_id = parent_task_run.flow_run_id
        parent_flow_run = await client.read_flow_run(parent_flow_run_id)
        return parent_flow_run.name

@flow
def child_flow():
    parent_flow_run_name = await get_parent_flow_run_name()
    # Now, you can use the parent_flow_run_name for whatever you need in the child flow
    pass

@flow
def parent_flow():
    # Parent flow logic
    child_flow()  # Here the child flow will retrieve the parent flow run name asynchronously

parent_flow()
This code constructs a new flow run name for the child flow by asynchronously calling
PrefectClient.read_task_run
and
PrefectClient.read_flow_run
. These functions are used to retrieve the parent flow's run name from its ID, which is accessed via the
parent_task_run_id
from the context object. From the excerpts found in the 'test_flows.py' GitHub source code, we can see that
parent_task_run_id
along with
prefect_client.read_flow_run
and
prefect_client.read_task_run
are being used to navigate relationships between parent and child entities in the Prefect runtime. If you need to use the flow run name to generate a name for a child flow run or for another use, you would make the
get_parent_flow_run_name
function return the desired string based on the parent flow run name. Remember that this code is running asynchronously, so you need to ensure that it's called from an asynchronous context, or to adapt it to a synchronous context by using an event loop to run the coroutine function.
r
Getting there.. @Marvin The callback function cannot be async, since the engine does not await the call to get_parent_flow_run_name. Also, the get_parent_flow_run_name function must be passed to the @flow decorator.
m
Indeed, the flow run name function that is passed to the
@flow
decorator in Prefect must be a synchronous callable and cannot be an asynchronous function since the engine does not await the call to
get_parent_flow_run_name
. Since the Prefect engine expects a synchronous callable, you will need to run the asynchronous code to interface with the Prefect API within a synchronous wrapper. To ensure the asynchronous code is executed synchronously, you could use the
run_sync
utility from
prefect.utilities.asyncio
. Here's how you could implement this:
Copy code
python
from prefect import flow, context
from prefect.client import get_client
from prefect.utilities.asyncio import run_sync

def get_parent_flow_run_id():
    return context.get().flow_run.parent_task_run_id

async def async_get_parent_flow_run_name(parent_task_run_id):
    async with get_client() as client:
        parent_task_run = await client.read_task_run(parent_task_run_id)
        parent_flow_run_id = parent_task_run.flow_run_id
        parent_flow_run = await client.read_flow_run(parent_flow_run_id)
        return parent_flow_run.name

def get_parent_flow_run_name():
    parent_task_run_id = get_parent_flow_run_id()
    # Use run_sync to synchronously run the async function
    return run_sync(async_get_parent_flow_run_name)(parent_task_run_id)

@flow(flow_run_name=get_parent_flow_run_name)
def child_flow():
    # Child flow contents here
    pass

@flow
def parent_flow():
    # Parent flow contents here
    child_flow()

parent_flow()
In the snippet above,
get_parent_flow_run_id
is a synchronous function that retrieves the
parent_task_run_id
from the current Prefect context. This ID is then passed to the
async_get_parent_flow_run_name
function, which fetches the parent flow run name asynchronously through the Prefect Client. The
run_sync
utility is then used to execute the asynchronous function synchronously. Please note that the actual implementation might require further adjustments depending on context setup, exception handling, and possible changes in the Prefect API, but the principle of wrapping the asynchronous call within a synchronous context should apply.
r
😞
n
@Marvin nah you’re over complicating things, you can use get_run_context() in the parent and then my_subflow.with_options(f”based-on-{flow_run_name}”)() to call a modified flow and give it a custom flow run name
m
My apologies for any confusion earlier. You're right, if you just need to pass the parent flow run name to a subflow and it's okay to handle that within the parent flow, using
get_run_context()
in the parent flow is simpler. Let me clarify with a concise example on how to apply this:
Copy code
python
from prefect import flow
from prefect.context import get_run_context

@flow
def child_flow(flow_run_name):
    # Child flow logic goes here
    pass

@flow
def parent_flow():
    run_context = get_run_context()
    flow_run_name = run_context.flow_run_name
    subflow_run_name = f"based-on-{flow_run_name}"
    
    # Call the subflow with a modified flow run name
    child_flow(flow_run_name=subflow_run_name)

parent_flow()
In the parent flow (
parent_flow
), you can call
get_run_context()
to obtain the current run context, which includes the parent's flow run name. Then you can construct the new flow run name (
subflow_run_name
) based on the parent's run name. You can then call the child flow (
child_flow
) with this custom flow run name by just passing the constructed name as an argument. This method directly uses the name you capture in the parent flow to affect the child flow without relying on asynchronous calls or the use of context inheritance beyond the explicit parameter passing. If you have any further questions or another angle you'd like to explore, please let me know.
n
@Marvin no, think step by step. use get_run_context() to get the FlowRunContext get the name attr from there, make an f-string for the subflow run name based on the parent name, give it to my_subflow using with_options like i mentioned, call the new subflow
m
I understand now, let's break it down step by step: 1. In the parent flow, use
get_run_context()
to access the parent's
FlowRunContext
. 2. Obtain the flow run name via the
name
attribute from the context. 3. Create an f-string to derive the subflow's run name. 4. Use the
with_options
method to set the
flow_run_name
for the subflow. 5. Call the new modified subflow. Here is a code example implementing your description:
Copy code
python
from prefect import flow
from prefect.context import get_run_context

@flow
def child_flow():
    # Child flow logic goes here
    pass

@flow
def parent_flow():
    # Retrieve the parent flow's run context
    parent_flow_context = get_run_context()

    # Get the parent flow's run name
    parent_flow_run_name = parent_flow_context.name

    # Form the subflow's run name based on the parent's name
    subflow_run_name = f"based-on-{parent_flow_run_name}"

    # Call the child subflow with the custom flow run name
    modified_subflow = child_flow.with_options(name=subflow_run_name)
    modified_subflow()

parent_flow()
In the above code: - The
get_run_context()
function is called within the
parent_flow
to obtain the current
FlowRunContext
. - We access the
name
property of this context to get the run name of the parent flow. - We then construct the subflow run name using an f-string. - The
child_flow
subflow is modified using the
with_options
method to set the custom run name. - Finally, the modified subflow is called. This should meet your requirements. If there is any other specific detail you need addressed, feel free to point it out.
n
^ except the kwarg for with_options should be flow_run_name instead of name
r
Using
with_options
indeed solves my issue, though it would be more convenient to do this automatically in the
flow_run_name
function of the child and auto-detecting that we're starting a child-flow. That seemed to be a much harder nut to crack though, so this will do. Thanks a lot!