https://prefect.io logo
Title
p

Patrik Fejda

04/05/2023, 2:12 PM
Hey guys, I have following dummy code, which saves the values to the database and returns a value to the client. I use prefect flow and task. Change the code, so the flow_url_upload() and url_upload() will not wait for the long task save_to_db(). The task save_to_db() should be just pushed to some queue, and the function flow_url_upload() should not wait for its execution. Is there some way?
import fastapi
from prefect import flow, task
import time

@task()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")

@flow()
def flow_my_endpoint(a,b):
    save_to_db(a,b)
    suma = a + b
    return {'suma': suma}

@router.post("/csv/url")
async def my_endpoint(a, b):
    return flow_my_endpoint(a,b)
d

Deceivious

04/05/2023, 2:16 PM
if ur task was converted into a subflow
p

Patrik Fejda

04/05/2023, 2:22 PM
Maybe it would solve the issue, but how can I run the code with this deployment?
d

Deceivious

04/05/2023, 2:23 PM
prefect.deployments.run_deployment
p

Patrik Fejda

04/05/2023, 2:34 PM
If I get it correctly, you suggest that the save_to_db() task should be transfered to a flow() which will be created as a deployment, and this deployment can be called without waiting for the flow to be executed, right?
1
d

Deceivious

04/05/2023, 2:41 PM
I am learning it myself right now. But from what i know,
task
runs within the context of a
flow
. A task cannot persist outside its parent flow context. Because you want ur flow to not be dependent on the
save_to_db
task which is not possible, you could convert the
save_to_db
to a different deployed flow and trigger the deployment from your
flow_my_endpoint
flow.
p

Patrik Fejda

04/05/2023, 2:43 PM
and how can I "deploy" this flow?
@flow()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")
d

Deceivious

04/05/2023, 2:44 PM
from prefect.deployments import Deployment
Deployment().build_from_flow(
        save_to_db, name="NAME FOR DEPLOYMENT"
    ).apply()
p

Patrik Fejda

04/05/2023, 2:59 PM
Okay, and now I should run it like this, right?
def flow_my_endpoint(a,b):
    run_deployment(name="NAME FOR DEPLOYMENT", parameter={"a":a, "b":b})
    return {'suma': suma}
d

Deceivious

04/05/2023, 2:59 PM
Yes
p

Patrik Fejda

04/05/2023, 3:03 PM
By any change, do you know how can this be solved?
File "/app/./src/routers/connector.py", line 573, in <module>
    Deployment().build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT").apply()
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for Deployment
name
  field required (type=value_error.missing)
d

Deceivious

04/05/2023, 3:03 PM
Maybe it's Deployment.build_from_flow
No initialization on the deployment class
p

Patrik Fejda

04/05/2023, 3:08 PM
Yeah< I get it now. Have you tried deploying from the code? I tried:
dep = Deployment.build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT")
dep.apply()
And got this error:
File "/app/./src/routers/connector.py", line 576, in <module>
    dep.apply()
AttributeError: 'coroutine' object has no attribute 'apply'
d

Deceivious

04/05/2023, 3:09 PM
Yes all my cicd uses it
Maybe u need to await the build from flow? M not on my workstation so i can't provide the codes
p

Patrik Fejda

04/05/2023, 3:10 PM
Can this be ran like this?
import fastapi
from prefect import flow, task
import time

@flow()
def save_to_db(a,b):
    time.sleep(100)
    print("Saving to db...")
    print("Saved to DB")

Deployment.build_from_flow(save_to_db, name="NAME-FOR-DEPLOYMENT").apply()

@flow()
def flow_my_endpoint(a,b):
    save_to_db(a,b)
    suma = a + b
    return {'suma': suma}

@router.post("/csv/url")
async def my_endpoint(a, b):
    return flow_my_endpoint(a,b)
this works for you?
d

Deceivious

04/05/2023, 3:11 PM
Don't bundle the deployment code with ur main code. You should only deploy once when the code is changed or deployed for the first time.
1