https://prefect.io logo
g

Gregory Hunt

08/31/2023, 5:50 PM
Is is possible to get a subflow to use a specific deployment configs?
@Nate
n

Nate

09/01/2023, 1:14 PM
hmm can you give an example? im not sure what you mean
g

Gregory Hunt

09/01/2023, 1:15 PM
I have a deployment with a flow, in that flow is a subflow
the subflow is not associated to any specific deployment
but it does spin up another job, this is in k8s by the way
how do I set parameters for that subflow job/container
n

Nate

09/01/2023, 1:19 PM
are you saying that somehow the subflow process is happening somewhere other than the parent flow run? or that the subflow is triggering a k8s job via the API on a cluster somewhere else
because in general, deployments (their work pools) are the way for prefect to allow for infra configuration
g

Gregory Hunt

09/01/2023, 1:32 PM
Yes the subflow creates a secondary container
n

Nate

09/01/2023, 1:35 PM
so it sounds like that job creation is not via prefect, so I'd guess you'd have to configure that job through the API thru similar means to how you created it
g

Gregory Hunt

09/01/2023, 1:35 PM
no it is via prefect
n

Nate

09/01/2023, 1:35 PM
ah sorry, can you show code?
g

Gregory Hunt

09/01/2023, 1:36 PM
Copy code
@flow(name="Files from Main Query Flow")
def pipeline(msg: dict):
    logger = get_run_logger()
    for attr, value in msg.items():
        <http://logger.info|logger.info>(f"{attr}, {value}")

    if msg["status"] == "MALWARE_SCAN_COMPLETE":
        parse_provider_data.pipeline(msg["storage_path"])
Copy code
@flow(name="Parse Provider Data from CCDA")
def pipeline(filename: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Parsing: {filename}")
    sleep(randint(1, 10))
the Parse Provider subflow get spun up in another container
n

Nate

09/01/2023, 1:38 PM
how? by default, subflows are run as a subprocess of the parent flow
g

Gregory Hunt

09/01/2023, 1:39 PM
Copy code
@flow(name="Parse Provider Data from CCDA")
def pipeline(filename: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Parsing: {filename}")
    sleep(randint(1, 10))
see how it shows up in the ui
n

Nate

09/01/2023, 1:44 PM
i may be misunderstanding something - i see that your
Parse Provider Data from CCDA
flow has no associated deployment, so I'm confused how
the Parse Provider subflow get spun up in another container
- unless you're doing something under the hood with this call
parse_provider_data.pipeline(msg["storage_path"])
that I can't see here
g

Gregory Hunt

09/01/2023, 1:45 PM
Nope, all I am doing is that call
n

Nate

09/01/2023, 1:46 PM
then i would expect that flow is running as a subprocess of
Files from Main Query Flow
- can you explain how you know that
the Parse Provider subflow get spun up in another container
?
g

Gregory Hunt

09/01/2023, 2:00 PM
Ok looks like I was wring about this, I saw similar named containers, but with the adjectives and animal names they looked the same
Is there a reason to use a subflow over task?
n

Nate

09/01/2023, 2:28 PM
gotcha, I'd say it depends: • use a subflow if you want to call other subflows or tasks • use a task if you wanna leverage the task runner or do caching