Pedro Machado

    Pedro Machado

    9 months ago
    Hi there. I have a parent flow that kicks off a child flow passing environment variables via a Kubernetes run config. The child flow runs a script using the ShellTask and explicitly passes environment variables to it. What would be the best way to get the environment variables from the run config to forward to the ShellTask? I'd probably want to override the hardcoded variables in the child flow with the variables passed via the run config. Thanks!
    Anna Geller

    Anna Geller

    9 months ago
    perhaps you can leverage parameters to pass some information down to a child flow run?
    from prefect.tasks.prefect import create_flow_run
    
    create_flow_run(flow_name="child-flow", parameters=dict(key="your_value"))
    Pedro Machado

    Pedro Machado

    9 months ago
    The only issue is that some env vars are secrets. Would they not show in prefect cloud?
    Anna Geller

    Anna Geller

    9 months ago
    in that case, the best way would be injecting those via PrefectSecret. Do you know how that works?
    Pedro Machado

    Pedro Machado

    9 months ago
    Yes. I am using them. Let me think through this more. Maybe the problem I am having is related to something else. I have too many levels. Parent flow runs child. Child runs script. Script runs remote Azure ML job. Some env vars are not making it to azure. I'll report back. Thanks Anna.
    Kevin Kho

    Kevin Kho

    9 months ago
    You can pass it through the env of the agent and then have multiple agents for different values? Is this a prod/dev kind of thing?
    Pedro Machado

    Pedro Machado

    9 months ago
    Yes, that is part of the puzzle. The team did not want to rely on DevOps to manage the secrets on the agent side (kubernetes agent running in cluster) so I implemented a different setup. The current set up is: • dev and prod projects with the same set of flows • I wrote a task that queries the API to find which project the flow run started from • I wrote a task that inherits from PrefectSecret that retrieves secrets using an environment prefix. The environment is the project name. For example PROD-SNOWFLAKE_USER is the snowflake user secret retrieved when the flow is running from the production project • A subset of the secrets is forwarded to the ShellTask (a python script written by a data scientist) The issue I was having yesterday is that the data scientist script that submits another ML script to run in Azure ML was not forwarding the environment variables. Since we have so many different levels (parent and child flows, data scientist wrapper script that starts remote run and actual ML script that does the real work) I am thinking of writing a task that can run on the parent flow that will start the remote Azure job. This would require only two levels: the parent flow and the ML script that gets submitted for remote execution by the parent flow. Since this would take some effort to convert the existing code, I am postponing it. The key reason for the parent and child flows separation is that the child flow is using a different docker image that has many data science libraries and is about 5GB big. This is the same runt time environment the data scientists use to develop their code. I did not want so many dependencies on the docker image that does the overall orchestration. Let me know if you have any suggestions to make this set up simpler. Thanks!
    Anna Geller

    Anna Geller

    9 months ago
    @Pedro Machado this seems like a really good setup. The parent-flow pattern is exactly for such use cases, where you need a parent flow to orchestrate multiple workflows that may be developed by different teams and require different resources such as different Docker images. Nice work!
    Pedro Machado

    Pedro Machado

    9 months ago
    One more question. In a flow of flows set up, how should I use results caching to enable the ability to restart? The child flow failed and I was trying to restart the parent flow by clicking on the restart button. I am getting this error:
    Created flow run 'carrot-firefly-ml_flow-model_prep.py': <https://cloud.prefect.io/fmpm-intelligence/flow-run/8cfddfe6-dcf7-4f17-80be-5ffe92fd943f>
    CloudTaskRunner
    Task 'run_ml_flow model_prep.py': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/home/pedro/clients/corrigo/ORCHESTRATION_PREFECT/flows/master_flow_monthly.py", line 128, in run_ml_flow
      File "/home/pedro/.pyenv/versions/miniconda3-latest/envs/corrigo-prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 223, in get_task_run_result
      File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 77, in get_result
        self._assert_result_type_is_okay()
      File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 165, in _assert_result_type_is_okay
        "The task result has no `location` so the result cannot be loaded. "
    ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
    I am using
    PrefectResult()
    at the flow level.
    Kevin Kho

    Kevin Kho

    9 months ago
    You would need to restart it on the child Flow level currently. A restart from the Parent Flow will not automatically trigger Child Flow restarts
    Pedro Machado

    Pedro Machado

    9 months ago
    Got it. Child first and then parent? Is this something orion will do differently?
    Kevin Kho

    Kevin Kho

    9 months ago
    Yes and yes. We know that’s not a good experience. So in Orion, you can retry a subflow. So if you have a subflow with A->B->C, and C fails, you can also retry all 3
    Pedro Machado

    Pedro Machado

    9 months ago
    Hi again. I'd like your input on a good pattern to vary parameters by environment. I'd like to be able to specify the default parameters at the time or flow registration but let the team override the default parameters in the Prefect Cloud UI. The approach I chose (described above where I use Prefect Secrets prefixed by the environment name) is working but right now some of the "secrets" are not secrets but parameters. For example, we have a
    DBTCLOUD_JOB_ID
    that needs to vary by environment. I am using a secret called
    PRODUCTION-DBTCLOUD_JOB_ID
    . However, the team would like to be able to see and edit the default values in the UI. What would be a good pattern to have different sets of default parameters that can be set upon flow registration but that an operator could override in the UI (not for an adhoc flow run but for all future scheduled job runs)? Thanks
    Kevin Kho

    Kevin Kho

    9 months ago
    You should make a dev project, and a prod project, and register the same Flow twice with different defaults I think
    Preferably through a CI/CD process. Have you seen this thread?
    Pedro Machado

    Pedro Machado

    9 months ago
    I'll take a look at the link. Thanks! What approach did you have in mind to do this? I am using the cli to register and the parameter defaults are set in the code.
    register the same Flow twice with different defaults
    Kevin Kho

    Kevin Kho

    9 months ago
    I have seen some people set a default depending on the presence of an environment variable. I have seen some people make their own CLI that takes in parameters and then registers it like:
    python myflow.py -p param1=2
    I think you’d have to move away from the CLI, unless you do something like editing your Python file with the default before registration
    Pedro Machado

    Pedro Machado

    9 months ago
    What if I used an API call to push the default parameters to a flow that is already registered? Could you help me with the mutation to push a parameter dict?
    So I am thinking I'd have a script that will set the default parameters for each environment (project) which would be version controlled but the operator can still go and change them in the UI.
    Kevin Kho

    Kevin Kho

    9 months ago
    Let me try
    from prefect import Client 
    
    client = Client()
    response = client.graphql("""
             mutation($flow_group_id: UUID!, $params: JSON!) {
               set_flow_group_default_parameters (input: {
                  flow_group_id: $flow_group_id, 
                   parameters: $params}) {
                 success
                 error
               }
             }
         """,
        variables={"flow_group_id": "3dc62fc4-5685-410f-843a-e73f673f063c", "params": {"x": 1}})
    print(response)
    Pedro Machado

    Pedro Machado

    9 months ago
    Thanks I will give it a try. How can I get the flow_group_id programmatically?
    Kevin Kho

    Kevin Kho

    9 months ago
    flow.register() returns it
    I am wrong. You likely need to query it with the GraphQL API also. The
    flow.register()
    returns the flow id but you want the flow group id