How can I programmatically create deployments of flows located in a S3 bucket within a python process ? I am using django to upload flows into an S3 bucket ( django is used for much more then just storing meta data about a flow). Whenever a new flow is uploaded, a django signal should trigger the deployment (to a execute the flow on a local process worker. I do this with the following function that is called when the prefect flow was uploaded to the S3 storage (and a django model instance with more metadata has been created) :
def deploy_prefect_process_loc(sender, instance, **kwargs):
"""
deploy prefect process
"""
s3_bucket_block = S3Bucket.load("minio-prefect-flows-lara")
print(f"******* - --- deploying prefect process {instance.name} - {prefect_workpool} ")
flow.from_source(
source=s3_bucket_block,
entrypoint="robot_workflow_sila.py:science_robotic_process"
).deploy(
name=f"minio-s3-deployment-lara-{instance.name}-{instance_id}",
work_pool_name="local-process-worker-1", #prefect_workpool,
job_variables={"env": {"EXTRA_PIP_PACKAGES": "prefect-aws"} },
tags=["lara", "sila"],
description="Deployment of a SiLA workflow.",
version="v0.0.1",
)
This programmatic deployment works fine as standalone code, but when I run it within the django process / event loop or async with celery, strange things happen - already at the stage of retrieving the S3_bucket_block:
( I added the error message in the thread)
Any idea, why this is not working and how I can achieve the deployment ? Thanks 🙏