Patrik Fejda
04/05/2023, 2:12 PMimport fastapi
from prefect import flow, task
import time
@task()
def save_to_db(a,b):
time.sleep(100)
print("Saving to db...")
print("Saved to DB")
@flow()
def flow_my_endpoint(a,b):
save_to_db(a,b)
suma = a + b
return {'suma': suma}
@router.post("/csv/url")
async def my_endpoint(a, b):
return flow_my_endpoint(a,b)
Deceivious
04/05/2023, 2:16 PMPatrik Fejda
04/05/2023, 2:22 PMDeceivious
04/05/2023, 2:23 PMprefect.deployments.run_deployment
Patrik Fejda
04/05/2023, 2:34 PMDeceivious
04/05/2023, 2:41 PMtask
runs within the context of a flow
. A task cannot persist outside its parent flow context. Because you want ur flow to not be dependent on the save_to_db
task which is not possible, you could convert the save_to_db
to a different deployed flow and trigger the deployment from your flow_my_endpoint
flow.Patrik Fejda
04/05/2023, 2:43 PM@flow()
def save_to_db(a,b):
time.sleep(100)
print("Saving to db...")
print("Saved to DB")
Deceivious
04/05/2023, 2:44 PMfrom prefect.deployments import Deployment
Deployment().build_from_flow(
save_to_db, name="NAME FOR DEPLOYMENT"
).apply()
Patrik Fejda
04/05/2023, 2:59 PMdef flow_my_endpoint(a,b):
run_deployment(name="NAME FOR DEPLOYMENT", parameter={"a":a, "b":b})
return {'suma': suma}
Deceivious
04/05/2023, 2:59 PMPatrik Fejda
04/05/2023, 3:03 PMFile "/app/./src/routers/connector.py", line 573, in <module>
Deployment().build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT").apply()
File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for Deployment
name
field required (type=value_error.missing)
Deceivious
04/05/2023, 3:03 PMPatrik Fejda
04/05/2023, 3:08 PMdep = Deployment.build_from_flow(data_analysis, name="NAME-FOR-DEPLOYMENT")
dep.apply()
And got this error:
File "/app/./src/routers/connector.py", line 576, in <module>
dep.apply()
AttributeError: 'coroutine' object has no attribute 'apply'
Deceivious
04/05/2023, 3:09 PMPatrik Fejda
04/05/2023, 3:10 PMimport fastapi
from prefect import flow, task
import time
@flow()
def save_to_db(a,b):
time.sleep(100)
print("Saving to db...")
print("Saved to DB")
Deployment.build_from_flow(save_to_db, name="NAME-FOR-DEPLOYMENT").apply()
@flow()
def flow_my_endpoint(a,b):
save_to_db(a,b)
suma = a + b
return {'suma': suma}
@router.post("/csv/url")
async def my_endpoint(a, b):
return flow_my_endpoint(a,b)
Deceivious
04/05/2023, 3:11 PM