Manuel Gomes
11/24/2021, 11:49 AMupload_file()
call, because big&binary). Since it's synchronous, I can trust the file will be where desired when task succeeds.
Next task in this flow is to transcode this video. I do so by creating a mediaconvert
boto3
client, and sending a mess of json to its create_job(**args)
method. This returns me... the job.
Now from what I've read... I should be able to use a prefect built-in prefect.tasks.aws.client_waiter.AWSClientWait
to wait for said job to finish (which is fine, at this point the workflow is serial/synchronous). Problem is... even when the job reports success (in the console, even!), it takes a while (minutes?!) for the transcoded movie to be present in the target bucket.
I would then... need to enter another wait task until I could find the file in the bucket's list of objects, possibly through prefect.tasks.aws.s3.S3List? until I could proceed to do further things to this transcoded video?
This conjunction sounds all too common not to have an integrated solution, unless I'm being dense (hah! no news there!) and not spotting an obvious solution. Any guidance?Anna Geller
Manuel Gomes
11/24/2021, 11:54 AMManuel Gomes
11/24/2021, 12:01 PMAnna Geller
import json
import logging
logger = logging.getLogger()
logger.setLevel(<http://logging.INFO|logging.INFO>)
def handler(event, context):
<http://logger.info|logger.info>("Received event: " + json.dumps(event))
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3_path = f"s3://{bucket}/{key}"
and this s3_path
could be passed as parameter in your create_flow_run
mutation, as described in the blog post. It could be attached as:
inputs["parameters"] = dict(s3_path=s3_path) # assuming s3_path is your Parameter name
Manuel Gomes
11/24/2021, 12:08 PM