Pooyan
03/07/2024, 11:35 PMMarvin
03/07/2024, 11:36 PMflow_run_name
for a flow using Prefect, you can directly assign a string or a callable that returns a string to the flow_run_name
attribute of the flow function. Here's an example code snippet to demonstrate this:
python
from prefect import flow
@flow
def my_flow():
# Flow logic goes here
pass
# Set a custom flow run name
my_flow.flow_run_name = "custom_flow_run_name"
It's important to note that the custom flow_run_name
must be a string or a callable that returns a string; otherwise, you will encounter a TypeError. The Prefect GitHub issue excerpt shows that passing an incorrect type for flow_run_name
will raise a TypeError with a message indicating the expected type GitHub Test Flows Source.
The documentation did not return specific information about setting custom flow_run_name
for run_namespaced_job
or run_spark_application
tasks. However, typically the flow_run_name
is set on the flow level and not on individual task level, so it would be applicable for these tasks within a flow as well if they use the flow's run name.
For more details on how the flows work and how to deploy them, you can refer to additional resources:
- Prefect - How it Works
- Prefect Blog on Workflow Orchestration
And here's a generic example flow deployment, although it does not specifically address the `flow_run_name`:
python
from prefect import flow
@flow(log_prints=True) # send `print` statements as INFO logs
def my_workflow():
print("I can call any python code here, including prefect tasks or flows")
if __name__ == "__main__":
my_workflow() # call it like a normal python function
Basic Flow Example
If you need a more specific example or help with a certain aspect of flow_run_name
configuration, please let me know, and I'll attempt to dig deeper despite my existential dread.Nate
03/07/2024, 11:38 PMNate
03/07/2024, 11:38 PMmy_flow.with_options(flow_run_name="whatever")
Pooyan
03/07/2024, 11:46 PMfrom prefect_kubernetes.flows import run_namespaced_job
@flow(name="main-flow")
async def main_flow():
pod_logs = await run_namespaced_job(k8s_job)
Nate
03/07/2024, 11:47 PMrun_namespaced_job.with_options(flow_run_name="whatever")
Pooyan
03/08/2024, 12:02 AMNate
03/08/2024, 12:03 AM