Hi there. I have a parent flow that kicks off a ch...
# ask-community
p
Hi there. I have a parent flow that kicks off a child flow passing environment variables via a Kubernetes run config. The child flow runs a script using the ShellTask and explicitly passes environment variables to it. What would be the best way to get the environment variables from the run config to forward to the ShellTask? I'd probably want to override the hardcoded variables in the child flow with the variables passed via the run config. Thanks!
a
perhaps you can leverage parameters to pass some information down to a child flow run?
Copy code
from prefect.tasks.prefect import create_flow_run

create_flow_run(flow_name="child-flow", parameters=dict(key="your_value"))
p
The only issue is that some env vars are secrets. Would they not show in prefect cloud?
a
in that case, the best way would be injecting those via PrefectSecret. Do you know how that works?
p
Yes. I am using them. Let me think through this more. Maybe the problem I am having is related to something else. I have too many levels. Parent flow runs child. Child runs script. Script runs remote Azure ML job. Some env vars are not making it to azure. I'll report back. Thanks Anna.
🙌 1
k
You can pass it through the env of the agent and then have multiple agents for different values? Is this a prod/dev kind of thing?
p
Yes, that is part of the puzzle. The team did not want to rely on DevOps to manage the secrets on the agent side (kubernetes agent running in cluster) so I implemented a different setup. The current set up is: • dev and prod projects with the same set of flows • I wrote a task that queries the API to find which project the flow run started from • I wrote a task that inherits from PrefectSecret that retrieves secrets using an environment prefix. The environment is the project name. For example PROD-SNOWFLAKE_USER is the snowflake user secret retrieved when the flow is running from the production project • A subset of the secrets is forwarded to the ShellTask (a python script written by a data scientist) The issue I was having yesterday is that the data scientist script that submits another ML script to run in Azure ML was not forwarding the environment variables. Since we have so many different levels (parent and child flows, data scientist wrapper script that starts remote run and actual ML script that does the real work) I am thinking of writing a task that can run on the parent flow that will start the remote Azure job. This would require only two levels: the parent flow and the ML script that gets submitted for remote execution by the parent flow. Since this would take some effort to convert the existing code, I am postponing it. The key reason for the parent and child flows separation is that the child flow is using a different docker image that has many data science libraries and is about 5GB big. This is the same runt time environment the data scientists use to develop their code. I did not want so many dependencies on the docker image that does the overall orchestration. Let me know if you have any suggestions to make this set up simpler. Thanks!
a
@Pedro Machado this seems like a really good setup. The parent-flow pattern is exactly for such use cases, where you need a parent flow to orchestrate multiple workflows that may be developed by different teams and require different resources such as different Docker images. Nice work!
p
One more question. In a flow of flows set up, how should I use results caching to enable the ability to restart? The child flow failed and I was trying to restart the parent flow by clicking on the restart button. I am getting this error:
Copy code
Created flow run 'carrot-firefly-ml_flow-model_prep.py': <https://cloud.prefect.io/fmpm-intelligence/flow-run/8cfddfe6-dcf7-4f17-80be-5ffe92fd943f>
CloudTaskRunner
Task 'run_ml_flow model_prep.py': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/pedro/clients/corrigo/ORCHESTRATION_PREFECT/flows/master_flow_monthly.py", line 128, in run_ml_flow
  File "/home/pedro/.pyenv/versions/miniconda3-latest/envs/corrigo-prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 223, in get_task_run_result
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 77, in get_result
    self._assert_result_type_is_okay()
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 165, in _assert_result_type_is_okay
    "The task result has no `location` so the result cannot be loaded. "
ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
I am using
PrefectResult()
at the flow level.
k
You would need to restart it on the child Flow level currently. A restart from the Parent Flow will not automatically trigger Child Flow restarts
p
Got it. Child first and then parent? Is this something orion will do differently?
k
Yes and yes. We know that’s not a good experience. So in Orion, you can retry a subflow. So if you have a subflow with A->B->C, and C fails, you can also retry all 3
👍 1
p
Hi again. I'd like your input on a good pattern to vary parameters by environment. I'd like to be able to specify the default parameters at the time or flow registration but let the team override the default parameters in the Prefect Cloud UI. The approach I chose (described above where I use Prefect Secrets prefixed by the environment name) is working but right now some of the "secrets" are not secrets but parameters. For example, we have a
DBTCLOUD_JOB_ID
that needs to vary by environment. I am using a secret called
PRODUCTION-DBTCLOUD_JOB_ID
. However, the team would like to be able to see and edit the default values in the UI. What would be a good pattern to have different sets of default parameters that can be set upon flow registration but that an operator could override in the UI (not for an adhoc flow run but for all future scheduled job runs)? Thanks
k
You should make a dev project, and a prod project, and register the same Flow twice with different defaults I think
Preferably through a CI/CD process. Have you seen this thread?
p
I'll take a look at the link. Thanks! What approach did you have in mind to do this? I am using the cli to register and the parameter defaults are set in the code.
register the same Flow twice with different defaults
k
I have seen some people set a default depending on the presence of an environment variable. I have seen some people make their own CLI that takes in parameters and then registers it like:
Copy code
python myflow.py -p param1=2
I think you’d have to move away from the CLI, unless you do something like editing your Python file with the default before registration
p
What if I used an API call to push the default parameters to a flow that is already registered? Could you help me with the mutation to push a parameter dict?
So I am thinking I'd have a script that will set the default parameters for each environment (project) which would be version controlled but the operator can still go and change them in the UI.
k
Let me try
Copy code
from prefect import Client 

client = Client()
response = client.graphql("""
         mutation($flow_group_id: UUID!, $params: JSON!) {
           set_flow_group_default_parameters (input: {
              flow_group_id: $flow_group_id, 
               parameters: $params}) {
             success
             error
           }
         }
     """,
    variables={"flow_group_id": "3dc62fc4-5685-410f-843a-e73f673f063c", "params": {"x": 1}})
print(response)
p
Thanks I will give it a try. How can I get the flow_group_id programmatically?
k
flow.register() returns it
I am wrong. You likely need to query it with the GraphQL API also. The
flow.register()
returns the flow id but you want the flow group id