james.lamb
03/08/2021, 8:59 PMflow.serialize()
?
Will share more details in thread, thanks!james.lamb
03/08/2021, 8:59 PMmetadata
parameter for the different environment classes. Because that extra data is serialized in flow.serialize()
(https://github.com/PrefectHQ/prefect/blob/7551a28c7ef1498cf6ddb352232b34f83c77cfd5/src/prefect/serialization/environment.py#L62), it is accessible on flow runs sent to agents.
For example
from prefect import Flow, task
from prefect.environments import KubernetesJobEnvironment
@task
def do_something() -> str:
return "no"
with Flow(name="my-flow") as flow:
x = do_something()
flow.environment = KubernetesJobEnvironment(metadata={"flow-id": "abc"})
flow.serialize()
Something like that yields output that includes the following in it
'metadata': {'flow-id': 'abc'}
In my use case, I maintain service that manages some details of flow execution for users. I'd like to be able to store identifiers from my service on flows, so that when a flow run is received from Prefect Cloud, that service can figure out how to handle details like displaying logs.Jim Crist-Harif
03/08/2021, 9:04 PMJim Crist-Harif
03/08/2021, 9:04 PMjames.lamb
03/08/2021, 9:15 PMKubernetesRun
run config, and some details used to create a Webhook
storage on the flow.
At that point, Prefect Cloud now knows about the flow and my service knows about the flow. Since registering with my service has to happen before registering with Prefect Cloud (to get the necessary details for storage and run config), my service can't create a mapping from its own ID for the flow to Prefect Cloud's flow id.
I'd like to store a {"my-flow-id": my_service_flow_id}
on the flow at build time. When a flow run comes in, I'd like to have my agent issue a request to my service like POST /api/flows/{my_service_flow_id}/runs/{flow_run_id}
, where flow_run_id
is generated by Prefect Cloud. This would just be to tell my service "hey there was a flow run for this flow, with this run ID".
That would allow me to do things like display how many flow runs there have been in some time window for a particular flow, and to go look in historical logs for now-completed flow runs and search by flow run id.
I know some of this sounds like it duplicates what Prefect Cloud already does, but I don't believe it does. At Saturn Cloud (my employer), we're providing infrastructure to run flows on. The logs and stuff my service might display happen outside of what prefect
flow code can see (like startup scripts that run when provisioning a Dask cluster).Jim Crist-Harif
03/08/2021, 9:17 PMJim Crist-Harif
03/08/2021, 9:17 PMrun_config
?james.lamb
03/08/2021, 9:23 PMregister_flow_with_saturn(flow)
, and that returns a flow with the run config and storage added for you.
Could you stick the info in the metadata of the k8s spec, and pull it off theÂOh that's an interesting idea! I forgot that in run_config world, the?run_config
job_template()
is on flow.serialize()
and you don't have to load the flow to extract it. Yeah it can definitely work to stick it in the metadata on the k8s spec!
flow.run_config = KubernetesRun(
job_template={
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"my-flow-id": my_service_flow_id},
"spec": {}
}
)
james.lamb
03/08/2021, 9:24 PMJim Crist-Harif
03/08/2021, 9:24 PMjames.lamb
03/08/2021, 9:25 PM