https://prefect.io logo
Title
g

Guy Altman

01/26/2023, 6:30 PM
Hey all, I have an event trigger azure function that kicks off a prefect deployment run using create_flow_run_from_deployment() whenever a blob is uploaded, however when there are multiple files uploaded at the same time, the az function crashes with this traceback msg. I would like the az function to concurrently create flow runs from a deployment. Is there a way to accomplish this? the img is the error msg of one of the crashed flows/az function.
n

Nate

01/26/2023, 7:10 PM
Hi @Guy Altman - instead of using
create_flow_run_from_deployment
can you try using
run_deployment
(from
prefect.deployments
) and see if you have the same issue?
concurrently create flow runs from a deployment
this is one of the main use cases that prompted us to create
run_deployment
🙂
g

Guy Altman

01/26/2023, 7:25 PM
How would I go about implementing that? Do I not need async functions anymore? my code is as follows
async def trigger_deployment(event):
     async with get_client() as client:
          depl_id = 'depl_id'
          respone = await client.create_flow_run_from_deployment(depl_id, parameters={'blob_path': event})
          print(response)
@Nate This is what I've tried and i'm still getting some failures with the same "task already running" msg
def main(event: func.EventGridEvent):
    trigger_deployment(event)

@flow(name='az_function')
def trigger_deployment(event):
    logger = get_run_logger()

    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })
    result = json.loads(result)
    cleaned_result = pp.pformat(result)
    <http://logger.info|logger.info>('Blob Event Metadata')
    <http://logger.info|logger.info>(cleaned_result)


    response = run_deployment(name='blob-ingestion/dp_blob_ingestion_boss', parameters={'blob_path': result['subject']})

    <http://logger.info|logger.info>(response)
@Nate just to be clear, the parallel/concurrency comes from the azure functions running at the same time. so when two azure functions are running side by side, I need them both to trigger the same deployment with specific params.
n

Nate

01/26/2023, 8:54 PM
you shouldn't need async functions, only when you're calling client methods (because what
get_client
returns is a custom version of
httpx.AsyncClient
) - you can use
run_deployment
synchronously hmm I'm not super familiar with azure functions, but I'd naively guess that prefect and azure are both attempting to manage an event loop in potentially incompatible ways what happens if you try and call
run_deployment
from there as a normal function (
not @flow decorated
) ?
g

Guy Altman

01/26/2023, 9:23 PM
@Nate Interesting, looks like removing the @flow decorator made it work. I know you guessed that azure and prefect are attempting to manage an event loop but can you elaborate on that? I'm curious why that ended up being the fix.
n

Nate

01/26/2023, 9:41 PM
hmm okay - it may make sense to open an issue on this, since I imagine we may want to be able to run flows in a place like azure functions at some point when you call a `@flow` decorated function, prefect will check whether the function you defined is async and whether there is an existing event loop, in order to decide how your
@flow
-decorated function should be submitted to the orchestration engine I'd guess that azure is also doing some magic to make sure that you have an event loop if you call async code in an azure function, but I'm not familiar with what that actually looks like