<@ULVA73B9P> in prefect 3, how do I access the fir...
# ask-marvin
j
@Marvin in prefect 3, how do I access the first flow run input that was sent
m
thought for 81.9 seconds
In Prefect 3.x, you can access the current flow run's parameters using the
prefect.runtime.flow_run
module. Here's how to access the flow run inputs:
Copy code
python
from prefect.runtime.flow_run import parameters

# Access all parameters
current_parameters = parameters

# If you know the specific parameter name
specific_parameter = parameters["parameter_name"]
Important notes: 1. This only gives you access to the current flow run's parameters. There isn't a direct way to access previous flow run inputs through the runtime context. 2. If you need to access previous flow run inputs, you would need to: - Query the API using
prefect.client.PrefectClient
- Or store the parameters yourself in a persistent storage solution (database, file, etc.) 3. The
parameters
attribute will be empty if accessed outside of a flow run context. Here's an example of how you might use it in a flow:
Copy code
python
from prefect import flow
from prefect.runtime.flow_run import parameters

@flow
def my_flow(x: int, y: str):
    # Access all parameters passed to this run
    print(f"All parameters: {parameters}")
    
    # Access specific parameters
    x_value = parameters["x"]
    y_value = parameters["y"]
    print(f"x: {x_value}, y: {y_value}")

# When you run the flow
my_flow(x=42, y="hello")
If you specifically need to access previous flow run inputs, you would need to use the Prefect API client to query historical flow runs. Would you like me to show you how to do that?
j
@Marvin it should be the flow run input that was received with receive_input, not the parameters
m
thought for 33.5 seconds
To access the first flow run input that was received using
receive_input()
, you can use this pattern:
Copy code
python
from prefect import flow
from prefect.input import receive_input

@flow
async def my_flow():
    # Get an iterator over the inputs
    async for input in receive_input(dict):  # or whatever your input_type is
        # This will be the first input received
        first_input = input
        break  # Stop after getting the first input
        
    # Use the first input
    print(f"First input received: {first_input}")
Alternatively, if you're using it in an async context and want to be more explicit:
Copy code
python
from prefect import flow
from prefect.input import receive_input

@flow
async def my_flow():
    # Create the input iterator
    input_iterator = receive_input(dict)  # or your specific input type
    
    # Get the first input
    first_input = await anext(input_iterator)
    print(f"First input received: {first_input}")
Important notes: 1. The input type must match what you're expecting to receive (dict, str, a Pydantic model, etc.) 2. This is an async operation, so it must be used within an async flow 3. If no inputs have been received yet, it will wait for inputs up to the timeout duration (default is 1 hour) 4. The inputs are returned in the order they were received, so the first item from the iterator will always be the first input that was sent Would you like me to show you more examples of how to handle multiple inputs or how to use specific input types?
j
@Marvin show me how to get the prefect client without async
m
thought for 33.3 seconds
Here's how to get a synchronous Prefect client:
Copy code
python
from prefect.client import get_client

# Get a synchronous client
client = get_client(sync_client=True)
That's the simplest way to get a synchronous client. You can then use this client to interact with the Prefect API synchronously. For example:
Copy code
python
from prefect.client import get_client

# Create the client
client = get_client(sync_client=True)

# Use the client
flow_runs = client.read_flow_runs()
deployments = client.read_deployments()
If you need to customize the client settings (like API URL or authentication), you can pass additional settings through the
httpx_settings
parameter:
Copy code
python
# With custom settings
client = get_client(
    sync_client=True,
    httpx_settings={
        "timeout": 30.0,  # Custom timeout in seconds
        # Other HTTPX settings as needed
    }
)
The synchronous client provides the same functionality as the async client, just in a synchronous form, making it easier to use in non-async contexts or scripts.
j
@Marvin how do i retrieve the flow run id from the current flow runtime through the client
m
thought for 36.7 seconds
Here's how to get the current flow run ID and use it with the client:
Copy code
python
from prefect import flow
from prefect.client import get_client
from prefect.runtime.flow_run import id as flow_run_id

@flow
def my_flow():
    # Get the current flow run ID
    current_flow_run_id = flow_run_id
    
    # Get a sync client
    client = get_client(sync_client=True)
    
    # Use the client to get flow run details
    flow_run = client.read_flow_run(current_flow_run_id)
    
    print(f"Current flow run ID: {current_flow_run_id}")
    print(f"Flow run details: {flow_run}")
A few important notes: 1. The
flow_run_id
is only available within a flow run context. If you try to access it outside a flow, it will be
None
. 2. You can import
id
directly from
prefect.runtime.flow_run
as shown above, which is the cleanest way to access it. 3. The client's
read_flow_run()
method will give you detailed information about the flow run, including its state, start time, and other metadata. If you need to handle cases where you might be outside a flow run context, you might want to add some error handling:
Copy code
python
from prefect import flow
from prefect.client import get_client
from prefect.runtime.flow_run import id as flow_run_id

@flow
def my_flow():
    if flow_run_id is None:
        print("Not running in a flow run context")
        return
        
    client = get_client(sync_client=True)
    flow_run = client.read_flow_run(flow_run_id)
    print(f"Flow run details: {flow_run}")
Is there anything specific about the flow run details you're trying to access?