Guy Altman
01/26/2023, 6:30 PMNate
01/26/2023, 7:10 PMcreate_flow_run_from_deployment
can you try using run_deployment
(from prefect.deployments
) and see if you have the same issue?
concurrently create flow runs from a deploymentthis is one of the main use cases that prompted us to create
run_deployment
🙂Guy Altman
01/26/2023, 7:25 PMasync def trigger_deployment(event):
async with get_client() as client:
depl_id = 'depl_id'
respone = await client.create_flow_run_from_deployment(depl_id, parameters={'blob_path': event})
print(response)
def main(event: func.EventGridEvent):
trigger_deployment(event)
@flow(name='az_function')
def trigger_deployment(event):
logger = get_run_logger()
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
result = json.loads(result)
cleaned_result = pp.pformat(result)
<http://logger.info|logger.info>('Blob Event Metadata')
<http://logger.info|logger.info>(cleaned_result)
response = run_deployment(name='blob-ingestion/dp_blob_ingestion_boss', parameters={'blob_path': result['subject']})
<http://logger.info|logger.info>(response)
Nate
01/26/2023, 8:54 PMget_client
returns is a custom version of httpx.AsyncClient
) - you can use run_deployment
synchronously
hmm I'm not super familiar with azure functions, but I'd naively guess that prefect and azure are both attempting to manage an event loop in potentially incompatible ways
what happens if you try and call run_deployment
from there as a normal function (not @flow decorated
) ?Guy Altman
01/26/2023, 9:23 PMNate
01/26/2023, 9:41 PM@flow
-decorated function should be submitted to the orchestration engine
I'd guess that azure is also doing some magic to make sure that you have an event loop if you call async code in an azure function, but I'm not familiar with what that actually looks like