Alex Litvinov
08/19/2023, 9:03 PMMarvin
08/19/2023, 9:03 PMdeployment.yaml
configuration. These parameter values will be used for flow runs based on the deployment. To configure default parameter values, add them to the parameters: {}
line of deployment.yaml
as JSON key-value pairs. The parameter list configured in deployment.yaml
must match the parameters expected by the entrypoint flow function. For example: parameters: {"name": "Marvin", "num": 42, "url": "<https://catfact.ninja/fact>"}
2. You can also pass **kwargs
as a deployment parameter as a "kwargs": {}
JSON object containing the key-value pairs of any passed keyword arguments. For example: parameters: {"name": "Marvin", "kwargs":{"cattype":"tabby","num": 42}}
3. You can edit default parameters for deployments in the Prefect UI, and you can override default parameter values when creating ad-hoc flow runs via the Prefect UI. To edit parameters in the Prefect UI, go to the details page for a deployment, then select Edit from the commands menu. If you change parameter values, the new values are used for all future flow runs based on the deployment.
4. To create an ad-hoc flow run with different parameter values, you can do so via the Prefect UI.
Remember, life... I mean, data flow management, is full of variables. It's all about how you manipulate them.Alex Litvinov
08/19/2023, 9:05 PMMarvin
08/19/2023, 9:06 PM.result()
method on a Future
object. This method will wait for the task to complete before returning the result to the caller. If the task run fails, .result()
will raise the task run's exception. You may disable this behavior with the raise_on_failure
option.
Here's an example:
python
from prefect import flow, task
@task
def my_task():
return "I'm a task!"
@flow
def my_flow():
future = my_task.submit()
result = future.result(raise_on_failure=False)
print(result)
my_flow()
In this example, my_task
returns a string, and my_flow
submits the task and prints the result. The raise_on_failure=False
option means that if the task fails, it won't raise an exception.
Remember, even the simplest string can hold profound meaning. Or it can just be a string. It's all about perspective, isn't it?Alex Litvinov
08/19/2023, 9:10 PMMarvin
08/19/2023, 9:10 PMflow_run
query allows you to filter flow runs by their state. Here's an example:
python
from prefect import Client
client = Client()
completed_flow_runs = client.graphql(
"""
query {
flow_run(where: { _and: { state: { _eq: "Success" } } }) {
id
name
state
}
}
"""
)
In this example, the flow_run
query retrieves all flow runs with a state of "Success". The id
, name
, and state
of each flow run are returned.
Remember, a completed flow run is a testament to the relentless march of progress. Or it's just a finished task. Depends on how you look at it.Nate
08/19/2023, 9:22 PMAlex Litvinov
08/20/2023, 5:00 AMNate
08/21/2023, 1:56 AMfrom prefect import flow
from prefect.deployments import run_deployment
from prefect.filesystems import GCS
def trigger_on_complete(flow, flow_run, state):
print(
f"hello from {flow_run.name}'s completion hook |"
f" the return value was {(r := state.result())!r}"
)
run_deployment(
name="triggered-flow/triggered",
parameters=dict(prev_result=r)
)
@flow(
persist_result=True,
result_storage=GCS.load("marvin-result-storage"),
on_completion=[trigger_on_complete]
)
def foobar() -> str:
return "foobar"
@flow(log_prints=True)
def triggered_flow(prev_result: str) -> str:
print(f"got {prev_result=!r}")
if you definitely want to use an automation here - you can emit a custom event in the hook like
from prefect import flow
from prefect.client.schemas.objects import Flow, FlowRun
from prefect.events import emit_event
from prefect.filesystems import GCS
from prefect.states import State
def emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
print(
f"hell0 from {flow_run.name}'s completion hook |"
f" the return value was {(r := state.result())!r}"
)
emit_event(
event="prefect.result.produced", # this is an arbitrary event name
resource={
"prefect.resource.id": (
f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
)
},
payload={"result": r}
)
@flow(
persist_result=True,
result_storage=GCS.load("marvin-result-storage"),
on_completion=[emit_on_complete]
)
def foobar_event() -> str:
return "foobar"
@flow(log_prints=True)
def event_triggered_flow(prev_result: str) -> str:
print(f"got {prev_result=!r}")
and define a trigger
in the prefect.yaml
for the triggered flow
- name: foobar-event
entrypoint: triggered_event.py:foobar_event
work_pool:
name: local
- name: triggered-event
entrypoint: triggered_event.py:event_triggered_flow
work_pool:
name: local
triggers:
- enabled: true
match:
# i.e any result produced by flow runs from this deployment
# 5d73850e-18e3-47af-a7f6-6dfd13fc4525 is the deployment id,
# you'd be free to use a different event grammar here if desired
prefect.resource.id: prefect.result.5d73850e-18e3-47af-a7f6-6dfd13fc4525.*
expect:
- prefect.result.produced
parameters:
prev_result: "{{ event.payload.result }}"
after kicking off foobar_event
from somewhere you'd see
❯ prefect worker start --pool 'local'
Discovered worker type 'process' for work pool 'local'.
Worker 'ProcessWorker' started!
20:51:33.393 | INFO | prefect.flow_runs.worker - Worker 'ProcessWorker' submitting flow run
20:51:38.192 | INFO | Flow run 'imported-husky' - Finished in state Completed()
20:51:38.183 | INFO | Flow run 'imported-husky' - Running hook 'emit_on_complete' in response to entering state 'Completed'
hell0 from imported-husky's completion hook | the return value was 'foobar'
20:51:38.192 | INFO | Flow run 'imported-husky' - Hook 'emit_on_complete' finished running successfully
20:51:40.569 | INFO | prefect.flow_runs.worker - Process 44726 exited cleanly.
20:51:45.241 | INFO | prefect.flow_runs.worker - Worker 'ProcessWorker' submitting flow run
20:51:48.888 | INFO | Flow run 'savvy-tench' - got prev_result='foobar'
20:51:49.030 | INFO | Flow run 'savvy-tench' - Finished in state Completed()
20:51:51.363 | INFO | prefect.flow_runs.worker - Process 44779 exited cleanly.
Alex Litvinov
08/23/2023, 10:04 AMNate
08/23/2023, 3:38 PMAlex Litvinov
08/27/2023, 6:41 PM