In one of my flows, I have a task to upload a vide...
# ask-community
m
In one of my flows, I have a task to upload a video to s3 (boto3 s3 resource
upload_file()
call, because big&binary). Since it's synchronous, I can trust the file will be where desired when task succeeds. Next task in this flow is to transcode this video. I do so by creating a
mediaconvert
boto3
client, and sending a mess of json to its
create_job(**args)
method. This returns me... the job. Now from what I've read... I should be able to use a prefect built-in
prefect.tasks.aws.client_waiter.AWSClientWait
to wait for said job to finish (which is fine, at this point the workflow is serial/synchronous). Problem is... even when the job reports success (in the console, even!), it takes a while (minutes?!) for the transcoded movie to be present in the target bucket. I would then... need to enter another wait task until I could find the file in the bucket's list of objects, possibly through prefect.tasks.aws.s3.S3List? until I could proceed to do further things to this transcoded video? This conjunction sounds all too common not to have an integrated solution, unless I'm being dense (hah! no news there!) and not spotting an obvious solution. Any guidance?
a
@Manuel Gomes an interesting use case! Are you fine adding AWS Lambda to the picture?
m
Well...I see that I could wait to get an SNS notification from the bucket or some such, neh? I am... leery, whenever I tried to mess with lambda, I ended up wearing egg, but.... well, do tell, do tell.. I'm listening..
Of course another alternative might be to create a simple web service that I could poll for the job, but that would only answer "yay" once both the job succeeded and the file was present... but that's again kinda of a kludge? I would ideally maintain at least an ilusion of vendor-agnosticism... 😄
a
Instead of waiting, you could approach this use case event-based. Especially, given that the waiting process doesn’t seem to be reliable (the job finished, but transcoded file is still not there yet). You could have a separate flow being triggered upon the S3 PUT object event in the S3 bucket where the transcoded file should arrive. The way it would work: 1. Your main flow starts running and triggers the transcoding process. Once you trggered this process, this flow may finish - no need for the AWSClientWait task 2. You have a Lambda function that gets invoked upon S3 PUT object trigger in the S3 bucket where you expect your transcoded file to arrive. This lambda function does nothing other than triggering your flow, and perhaps passing the S3 object name as flow Parameter. 3. The flow triggered from Lambda performs all downstream post-processing you need. If you are interested in this pattern, this blog post can be helpful. To retrieve the S3 object from the Lambda trigger event, you could use this:
Copy code
import json
import logging

logger = logging.getLogger()
logger.setLevel(<http://logging.INFO|logging.INFO>)

def handler(event, context):
    <http://logger.info|logger.info>("Received event: " + json.dumps(event))
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    s3_path = f"s3://{bucket}/{key}"
and this
s3_path
could be passed as parameter in your
create_flow_run
mutation, as described in the blog post. It could be attached as:
Copy code
inputs["parameters"] = dict(s3_path=s3_path) # assuming s3_path is your Parameter name
m
Oh, wow, @Anna Geller, thank you for the exquisitely complete response. I shall have to wrestle with my demons (mostly philosophical... AMZN as a company, etc) but what you described will certainly provide, in one shape or another, a workable solution.
🙌 1
😂 1