james.lamb

    james.lamb

    1 year ago
    👋 hello from Chicago! When configuring a flow using run configs, is it possible to provide extra data (think a small dictionary whose values are strings or ints) that will show up in
    flow.serialize()
    ? Will share more details in thread, thanks!
    With the now-deprecated "environment" pattern for configuring flows, it is possible to store arbitrary key-value pairs on a flow via the
    metadata
    parameter for the different environment classes. Because that extra data is serialized in
    flow.serialize()
    (https://github.com/PrefectHQ/prefect/blob/7551a28c7ef1498cf6ddb352232b34f83c77cfd5/src/prefect/serialization/environment.py#L62), it is accessible on flow runs sent to agents. For example
    from prefect import Flow, task
    from prefect.environments import KubernetesJobEnvironment
    
    @task
    def do_something() -> str:
        return "no"
    
    with Flow(name="my-flow") as flow:
        x = do_something()
    
    flow.environment = KubernetesJobEnvironment(metadata={"flow-id": "abc"})
    flow.serialize()
    Something like that yields output that includes the following in it
    'metadata': {'flow-id': 'abc'}
    In my use case, I maintain service that manages some details of flow execution for users. I'd like to be able to store identifiers from my service on flows, so that when a flow run is received from Prefect Cloud, that service can figure out how to handle details like displaying logs.
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    Hmmm, this is currently not supported. I'm flip-flopping on whether we'd want to support something like this (vs requiring users to store any data like this in an external mapping).
    Can you comment more on what you do with this info? Does your service watch for finished flow runs then pull info from prefect cloud with this extra metadata?
    james.lamb

    james.lamb

    1 year ago
    sure! Users register their flows with my service prior to registering them with Prefect Cloud. When they do this, my service responds with a k8s job spec used to populate a
    KubernetesRun
    run config, and some details used to create a
    Webhook
    storage on the flow. At that point, Prefect Cloud now knows about the flow and my service knows about the flow. Since registering with my service has to happen before registering with Prefect Cloud (to get the necessary details for storage and run config), my service can't create a mapping from its own ID for the flow to Prefect Cloud's flow id. I'd like to store a
    {"my-flow-id": my_service_flow_id}
    on the flow at build time. When a flow run comes in, I'd like to have my agent issue a request to my service like
    POST /api/flows/{my_service_flow_id}/runs/{flow_run_id}
    , where
    flow_run_id
    is generated by Prefect Cloud. This would just be to tell my service "hey there was a flow run for this flow, with this run ID". That would allow me to do things like display how many flow runs there have been in some time window for a particular flow, and to go look in historical logs for now-completed flow runs and search by flow run id. I know some of this sounds like it duplicates what Prefect Cloud already does, but I don't believe it does. At Saturn Cloud (my employer), we're providing infrastructure to run flows on. The logs and stuff my service might display happen outside of what
    prefect
    flow code can see (like startup scripts that run when provisioning a Dask cluster).
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    So you don't handle the flow registration yourself, only providing config to copy-paste?
    Could you stick the info in the metadata of the k8s spec, and pull it off the
    run_config
    ?
    james.lamb

    james.lamb

    1 year ago
    We don't handle registering with Prefect Cloud, right. It's not exactly copy-pasting...more like you do
    register_flow_with_saturn(flow)
    , and that returns a flow with the run config and storage added for you.
    Could you stick the info in the metadata of the k8s spec, and pull it off the 
    run_config
    ?
    Oh that's an interesting idea! I forgot that in run_config world, the
    job_template()
    is on
    flow.serialize()
    and you don't have to load the flow to extract it. Yeah it can definitely work to stick it in the metadata on the k8s spec!
    flow.run_config = KubernetesRun(
        job_template={
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {"my-flow-id": my_service_flow_id},
            "spec": {}
        }
    )
    thanks Jim, I promise this was genuinely an "is this still supported" question and not a "can you do free design work for me" question
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    no worries, if that works for you that'd be my preference. Rather not add generic flow metadata until we really need it.
    james.lamb

    james.lamb

    1 year ago
    totally understood, yeah I think that'll work! Thanks so much