Hey guys, I have following dummy code, which saves...
# ask-community
p
Hey guys, I have following dummy code, which saves the values to the database and returns a value to the client. I use prefect flow and task. Change the code, so the flow_url_upload() and url_upload() will not wait for the long task save_to_db(). The task save_to_db() should be just pushed to some queue, and the function flow_url_upload() should not wait for its execution. Is there some way?
Copy code
import fastapi
from prefect import flow, task
import time

@task()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")

@flow()
def flow_my_endpoint(a,b):
    save_to_db(a,b)
    suma = a + b
    return {'suma': suma}

@router.post("/csv/url")
async def my_endpoint(a, b):
    return flow_my_endpoint(a,b)
d
if ur task was converted into a subflow
p
Maybe it would solve the issue, but how can I run the code with this deployment?
d
prefect.deployments.run_deployment
p
If I get it correctly, you suggest that the save_to_db() task should be transfered to a flow() which will be created as a deployment, and this deployment can be called without waiting for the flow to be executed, right?
1
d
I am learning it myself right now. But from what i know,
task
runs within the context of a
flow
. A task cannot persist outside its parent flow context. Because you want ur flow to not be dependent on the
save_to_db
task which is not possible, you could convert the
save_to_db
to a different deployed flow and trigger the deployment from your
flow_my_endpoint
flow.
p
and how can I "deploy" this flow?
Copy code
@flow()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")
d
Copy code
from prefect.deployments import Deployment
Deployment().build_from_flow(
        save_to_db, name="NAME FOR DEPLOYMENT"
    ).apply()
p
Okay, and now I should run it like this, right?
Copy code
def flow_my_endpoint(a,b):
    run_deployment(name="NAME FOR DEPLOYMENT", parameter={"a":a, "b":b})
    return {'suma': suma}
d
Yes
p
By any change, do you know how can this be solved?
Copy code
File "/app/./src/routers/connector.py", line 573, in <module>
    Deployment().build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT").apply()
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for Deployment
name
  field required (type=value_error.missing)
d
Maybe it's Deployment.build_from_flow
No initialization on the deployment class
p
Yeah< I get it now. Have you tried deploying from the code? I tried:
Copy code
dep = Deployment.build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT")
dep.apply()
And got this error:
Copy code
File "/app/./src/routers/connector.py", line 576, in <module>
    dep.apply()
AttributeError: 'coroutine' object has no attribute 'apply'
d
Yes all my cicd uses it
Maybe u need to await the build from flow? M not on my workstation so i can't provide the codes
p
Can this be ran like this?
Copy code
import fastapi
from prefect import flow, task
import time

@flow()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")

Deployment.build_from_flow(save_to_db, name="NAME-FOR-DEPLOYMENT").apply()

@flow()
def flow_my_endpoint(a,b):
    save_to_db(a,b)
    suma = a + b
    return {'suma': suma}

@router.post("/csv/url")
async def my_endpoint(a, b):
    return flow_my_endpoint(a,b)
this works for you?
d
Don't bundle the deployment code with ur main code. You should only deploy once when the code is changed or deployed for the first time.
1