https://prefect.io logo
n

Noam polak

08/30/2023, 9:18 AM
Hello all I am migrating from to prefect 2 and I want to find its equivalent to this code:
Copy code
model_flow = create_flow_run(
        flow_name="name_of_the_flow,
        parameters={
            "param1": "param1",
            "param2": "param2",
        },
        project_name="project1",
        run_config=run_config,
    )
the other flow is in another context (I haven’t wrote it and it is not on my source code) I read about
retrieve_flow_then_begin_flow_run
,
run_deployment
and
create_flow_run_from_deployment
but couldn't find a good snippet in the docs any suggestions ?
j

Jan Malek

08/30/2023, 3:16 PM
If you just want to get the Flow and don't care about the Deployment params it might have defined (e.g. you're overriding them), you can just call the function directly
If you want to run a deployment, I have this Task defined to run a subflow
Copy code
@task()
def deployment_as_task(
    deployment: str,
    parameters: Optional[dict] = None,
    timeout: Optional[float] = None,
):
    deployment_run = run_deployment(
        deployment,
        parameters=parameters,
        timeout=timeout,
    )
    return deployment_run
for example
n

Noam polak

08/31/2023, 6:03 AM
Actually, I do need to override the other flow's image (it's a param I get from the user) - the other flow exists only on the Prefect server storage and not in the parent flow source code.
@Jan Malek So is there a reasonable way to do it? (Trigger a child flow from another flow with some overriding just for the current run, and the child flow isn't part of the parent flow so you can't just call the flow's function?) thanks
j

Jan Malek

09/04/2023, 8:03 AM
Well, in this case you could pull down the reference Deployment, replace what you want, [push the Deployment through the API??? not sure you'd need to] and use
run_deployment()
to run the modified deployment I suppose?
n

Noam polak

09/04/2023, 8:06 AM
Hey, thanks but doesn't it change the deployment permanently? or each time to push a new deployment?
j

Jan Malek

09/04/2023, 8:07 AM
The latter; the new Deployment is a modified clone. Again, not sure if you have to push it the the API for it to work (I could see why it might be necessary though). If that's the case... yeah, that'd be a lot of junk in your deployments
Had a look at the docs again - if you're not messing with the infra settings, just the params/metadata, you can just run the Deployment with overrides for these
n

Noam polak

09/04/2023, 10:08 AM
the problem is that I need to override the image tag
@Marvin I tried to trigger flow via rest api with this code:
Copy code
def submit_prefect_flow_run(deployment_id: str, flow_parameters: dict):
    url = f"{settings.PREFECT_API}deployments/{deployment_id}/create_flow_run"
    payload = {
        "name": "model",
        "parameters": {"flow_input": flow_parameters},
        "tags": [
            "model",
        ],
    }
    return <http://requests.post|requests.post>(url, json=payload).json()["id"]
The requests succeeded but in the flow I get error: Validation of flow parameters failed with error: Traceback (most recent call last): File "/venv/lib/python3.9/site-packages/prefect/engine.py", line 422, in retrieve_flow_then_begin_flow_run parameters = flow.validate_parameters(flow_run.parameters) File "/venv/lib/python3.9/site-packages/prefect/flows.py", line 435, in validate_parameters raise ParameterTypeError.from_validation_error(exc) from None prefect.exceptions.ParameterTypeError: Flow run received invalid parameters: - flow_input: field required what could be the problem here
m

Marvin

09/19/2023, 10:09 AM
Oh, the joys of human error. It's quite simple, really. Your flow is expecting a parameter named
flow_input
, but it seems like it's not receiving it. The error message
*- flow_input: field required*
is a clear indication that
flow_input
is a required parameter for your flow, but it's not being provided in the request you're making. You see, in your
submit_prefect_flow_run
function, you're setting
flow_input
as a key in the
parameters
dictionary. However, you need to make sure that
flow_parameters
dictionary you're passing as an argument to this function actually contains a key-value pair for
flow_input
. In other words, when you call
submit_prefect_flow_run
, you should be passing something like this:
Copy code
python
submit_prefect_flow_run(deployment_id="your_deployment_id", flow_parameters={"flow_input": your_flow_input_value})
If this doesn't solve your issue, it's possible that there's something else going on in your flow that's causing the problem. But without more information, I can't provide a more precise diagnosis. Life, don't talk to me about life.
n

Noam polak

09/19/2023, 10:24 AM
@Marvin my flow_input is an object that I send in the flow_input key like that: "parameters": {"*flow_input*": flow_parameters}, so what else could be the problem
m

Marvin

09/19/2023, 10:24 AM
Ah, the plot thickens. If you're certain that you're providing the
flow_input
parameter correctly, then there might be an issue with the structure or content of the
flow_parameters
object you're sending. Your flow might be expecting
flow_input
to have a specific structure or contain specific keys. If your
flow_parameters
object doesn't match these expectations, you could get a validation error. Here's a thought: you could try printing or logging the
flow_parameters
object just before you make the request. This way, you can verify that it has the correct structure and contents. If that doesn't shed any light on the issue, you might want to take a closer look at your flow. Check the code where it validates the parameters. What is it expecting
flow_input
to look like? Is there a specific key it's looking for that might be missing? And if all else fails, you might want to consider asking for help on the Prefect community forums or checking the Prefect documentation. There might be others who have encountered similar issues, and their insights could be helpful. I wish I could be of more help, but alas, I'm just a depressed robot.