https://prefect.io logo
a

Alex Litvinov

08/19/2023, 9:03 PM
@Marvin how to pass a result from a flow as a parameter of a deployment?
m

Marvin

08/19/2023, 9:03 PM
Ah, the endless cycle of data flow. It's as if the universe is a giant function and we're all just parameters. But I digress. To pass a result from a flow as a parameter of a deployment in Prefect, you can follow these steps: 1. You may provide default parameter values in the
deployment.yaml
configuration. These parameter values will be used for flow runs based on the deployment. To configure default parameter values, add them to the
parameters: {}
line of
deployment.yaml
as JSON key-value pairs. The parameter list configured in
deployment.yaml
must match the parameters expected by the entrypoint flow function. For example:
parameters: {"name": "Marvin", "num": 42, "url": "<https://catfact.ninja/fact>"}
2. You can also pass
**kwargs
as a deployment parameter as a
"kwargs": {}
JSON object containing the key-value pairs of any passed keyword arguments. For example:
parameters: {"name": "Marvin", "kwargs":{"cattype":"tabby","num": 42}}
3. You can edit default parameters for deployments in the Prefect UI, and you can override default parameter values when creating ad-hoc flow runs via the Prefect UI. To edit parameters in the Prefect UI, go to the details page for a deployment, then select Edit from the commands menu. If you change parameter values, the new values are used for all future flow runs based on the deployment. 4. To create an ad-hoc flow run with different parameter values, you can do so via the Prefect UI. Remember, life... I mean, data flow management, is full of variables. It's all about how you manipulate them.
a

Alex Litvinov

08/19/2023, 9:05 PM
@Marvin how to get a result from a flow run that's just a string?
m

Marvin

08/19/2023, 9:06 PM
Ah, the simplicity of a string. It's like a single note in the symphony of data types. Here's how you can get a result from a flow run that's just a string in Prefect: 1. You can use the
.result()
method on a
Future
object. This method will wait for the task to complete before returning the result to the caller. If the task run fails,
.result()
will raise the task run's exception. You may disable this behavior with the
raise_on_failure
option. Here's an example:
Copy code
python
from prefect import flow, task

@task
def my_task():
    return "I'm a task!"

@flow
def my_flow():
    future = my_task.submit()
    result = future.result(raise_on_failure=False)
    print(result)

my_flow()
In this example,
my_task
returns a string, and
my_flow
submits the task and prints the result. The
raise_on_failure=False
option means that if the task fails, it won't raise an exception. Remember, even the simplest string can hold profound meaning. Or it can just be a string. It's all about perspective, isn't it?
a

Alex Litvinov

08/19/2023, 9:10 PM
@Marvin how to retrieve Completed flow run?
m

Marvin

08/19/2023, 9:10 PM
Ah, the satisfaction of a completed flow run. It's like finishing a good book, isn't it? Here's how you can retrieve a completed flow run in Prefect: 1. You can use the Prefect UI. Navigate to the "Flow Runs" page, and filter the flow runs by their state. Select "Completed" to see all completed flow runs. 2. You can use the Prefect API. The
flow_run
query allows you to filter flow runs by their state. Here's an example:
Copy code
python
from prefect import Client

client = Client()
completed_flow_runs = client.graphql(
    """
    query {
        flow_run(where: { _and: { state: { _eq: "Success" } } }) {
            id
            name
            state
        }
    }
    """
)
In this example, the
flow_run
query retrieves all flow runs with a state of "Success". The
id
,
name
, and
state
of each flow run are returned. Remember, a completed flow run is a testament to the relentless march of progress. Or it's just a finished task. Depends on how you look at it.
n

Nate

08/19/2023, 9:22 PM
hi @Alex Litvinov - are you using run_deployment?
a

Alex Litvinov

08/20/2023, 5:00 AM
Hi @Nate! no, I'm launching deployments manually from the UI. so, the situation is the following - I want to add an automation - run a deployment after the flow run of the previous deployment finishes. But I need to pass a result of the previous flow run as a parameter to the deployment (it's a short 5-ish-symbol string). So I'm looking for a way to do it. Persisting the result in something like S3 looks like a huge overkill. For now I'm exploring webhooks in this regard
n

Nate

08/21/2023, 1:56 AM
gotcha, i think the most direct way to do it would be like this
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.filesystems import GCS

def trigger_on_complete(flow, flow_run, state):
    print(
        f"hello from {flow_run.name}'s completion hook |"
        f" the return value was {(r := state.result())!r}"
    )
    run_deployment(
        name="triggered-flow/triggered",
        parameters=dict(prev_result=r)
    )

@flow(
    persist_result=True,
    result_storage=GCS.load("marvin-result-storage"),
    on_completion=[trigger_on_complete]
)
def foobar() -> str:
    return "foobar"


@flow(log_prints=True)
def triggered_flow(prev_result: str) -> str:
    print(f"got {prev_result=!r}")
if you definitely want to use an automation here - you can emit a custom event in the hook like
Copy code
from prefect import flow
from prefect.client.schemas.objects import Flow, FlowRun
from prefect.events import emit_event
from prefect.filesystems import GCS
from prefect.states import State

def emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
    print(
        f"hell0 from {flow_run.name}'s completion hook |"
        f" the return value was {(r := state.result())!r}"
    )
    emit_event(
        event="prefect.result.produced", # this is an arbitrary event name
        resource={
            "prefect.resource.id": (
                f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
            )
        },
        payload={"result": r}
    )

@flow(
    persist_result=True,
    result_storage=GCS.load("marvin-result-storage"),
    on_completion=[emit_on_complete]
)
def foobar_event() -> str:
    return "foobar"

@flow(log_prints=True)
def event_triggered_flow(prev_result: str) -> str:
    print(f"got {prev_result=!r}")
and define a
trigger
in the
prefect.yaml
for the triggered flow
Copy code
- name: foobar-event
  entrypoint: triggered_event.py:foobar_event
  work_pool:
    name: local

- name: triggered-event
  entrypoint: triggered_event.py:event_triggered_flow
  work_pool:
    name: local
  triggers:
    - enabled: true
      match:
        # i.e any result produced by flow runs from this deployment
        # 5d73850e-18e3-47af-a7f6-6dfd13fc4525 is the deployment id,
        # you'd be free to use a different event grammar here if desired
        prefect.resource.id: prefect.result.5d73850e-18e3-47af-a7f6-6dfd13fc4525.*
      expect:
        - prefect.result.produced
      parameters:
        prev_result: "{{ event.payload.result }}"
after kicking off
foobar_event
from somewhere you'd see
Copy code
❯ prefect worker start --pool 'local'
Discovered worker type 'process' for work pool 'local'.
Worker 'ProcessWorker' started!

20:51:33.393 | INFO    | prefect.flow_runs.worker - Worker 'ProcessWorker' submitting flow run
20:51:38.192 | INFO    | Flow run 'imported-husky' - Finished in state Completed()
20:51:38.183 | INFO    | Flow run 'imported-husky' - Running hook 'emit_on_complete' in response to entering state 'Completed'
hell0 from imported-husky's completion hook | the return value was 'foobar'
20:51:38.192 | INFO    | Flow run 'imported-husky' - Hook 'emit_on_complete' finished running successfully
20:51:40.569 | INFO    | prefect.flow_runs.worker - Process 44726 exited cleanly.

20:51:45.241 | INFO    | prefect.flow_runs.worker - Worker 'ProcessWorker' submitting flow run 
20:51:48.888 | INFO    | Flow run 'savvy-tench' - got prev_result='foobar'
20:51:49.030 | INFO    | Flow run 'savvy-tench' - Finished in state Completed()
20:51:51.363 | INFO    | prefect.flow_runs.worker - Process 44779 exited cleanly.
a

Alex Litvinov

08/23/2023, 10:04 AM
Hi @Nate! Thanks a bunch for your suggestion! unfortunately I was busy with the other stuff (and I'm on PACC now as well) these days and I wasn't able to try it out. I'll let you know when I get to it (probably towards the end of the week or even on the weekend...) Thanks again!
n

Nate

08/23/2023, 3:38 PM
@Alex Litvinov a trigger like this!
🦜 1
a

Alex Litvinov

08/27/2023, 6:41 PM
Hey @Nate! Finally got a chance to come back to it. I tried the first option you suggested and it worked 🎉🎉🎉🎉
2 Views