k

    Kevin

    10 months ago
    Hi - I have a flow that retrieves a list of keys from S3. Each key represents a .zip file which is an archive that when unzipped contains 5 different csv files. I am trying to map over those keys and write each file within the archive to Azure Storage. I am having issues understanding how to handle the zipped object - which at times should be unmapped. The working code I have right now only writes out the data associated with the last file in the list
    with Flow("s3-ingest-azure-load") as flow:
        s3_keys = ven_next_keys(prefix='prefix') # [list, of, keys]
        s3_obj = ven_next_dl.map(key=s3_keys, as_bytes=unmapped(True)) # [list, of, objs]
        zipped_object = convert_to_zip.map(s3_obj) #[objs, as, zip]
        files = create_list_of_files.map(zipped_object) # [[list],[of],[zipInfoObjects]]
        file_name = create_file_name.map(zip_file=flatten(files), upstream_tasks=[unmapped(zipped_object)]) # [list, of, filenames]
        file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files)) # [list, of, data] but currently only includes the data associated with the last file
        blob_name = azure_upload.map(data=file_data, blob_name=file_name, overwrite=unmapped(True))
    Kevin Kho

    Kevin Kho

    10 months ago
    Hey @Kevin, do you mean the last file is mapped N times or the mapped task only runs 1 time?
    k

    Kevin

    10 months ago
    The azure_upload.map() task only runs once...
    [2021-10-25 16:27:23+0000] INFO - prefect.TaskRunner | Task 'BlobStorageUploadOverwrite[0]': Finished task run for task with final state: 'Success'
    [2021-10-25 16:27:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Kevin Kho

    Kevin Kho

    10 months ago
    Are all of these maps the same length?
    k

    Kevin

    10 months ago
    no... thats what i have had a tough time thinking through
    i have mapped objects that represent the s3 objects (the zipped file - example.zip).
    and then i have mapped objects that represent the files within the zipped file (example1.csv, example2.csv, etc.)
    so its a 1:many
    i was trying to make it dynamic enough so that i dont have to assume that i will only download 1 s3 object/zip file per flow execution
    i'm thinking maybe i should create two distinct flows. one for downloading the s3 object and the other for extrating the files out of that object
    Kevin Kho

    Kevin Kho

    10 months ago
    I’ll give this a shot in a bit
    k

    Kevin

    10 months ago
    Sounds good
    Kevin Kho

    Kevin Kho

    10 months ago
    I am a bit confused here”
    file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files))
    Are those lengths equal for the map?
    k

    Kevin

    10 months ago
    no... zipped_object = a list of zip archives downloaded from s3. and files is a list of files within a single one of those archives
    so the length of zipped_object would equal the number of objects we download from s3
    and the length of files would be equal to the number of files within one of those objects from s3
    Kevin Kho

    Kevin Kho

    10 months ago
    I think these need to be equal somehow. This is the line where things go wrong right?
    k

    Kevin

    10 months ago
    yes
    i guess i could try and copy the zipped object to be equal to the number of files within it
    Kevin Kho

    Kevin Kho

    10 months ago
    Could you try an intermediate task to reshape the inputs to make them equal? I’m working on a flow with similar structure. I just got kinda stuck here 😅
    k

    Kevin

    10 months ago
    do you think that would be better than creating two distinct flows?
    one flow to download the s3 object and convert it to a ZipObject... the second flow would accept the ZipObject as a parameter
    that way i could keep it unmapped as i work through the list of files within it
    Kevin Kho

    Kevin Kho

    10 months ago
    Yeah I think it can be done in one flow
    k

    Kevin

    10 months ago
    i ended up just creating the equal mapping length - it worked!
    funny work around but thats oaky
    Kevin Kho

    Kevin Kho

    10 months ago
    I think when you map, the calls need to be equal length because each of those pairwise elements is submitted as a task so you’ll run into problems if there is no pair.
    k

    Kevin

    10 months ago
    Hey @Kevin Kho - revisiting this thread as i've run into an issue. I have a flow that works when I run it locally but runs into this error when i deploy and run it via prefect cloud on my kubernetes cluster:
    Unexpected error: TypeError("can't pickle _thread.RLock objects")
    the task this fails on is a task that receives the downloaded bytes from S3 and uses them to initialize and return a ZipFile object
    having a really tough time troubleshooting/triaging
    @task 
    def convert_to_zip(s3_obj):
        zipped_file = ZipFile(BytesIO(s3_obj), 'r')
        return zipped_file
    Kevin Kho

    Kevin Kho

    10 months ago
    can you try manually using
    cloudpickle
    on the
    ZipFile
    object?
    import cloudpickle
    cloudpickle.dumps(ZipFile)
    Or the s3_obj
    Are you using Dask also?
    k

    Kevin

    10 months ago
    i'm just using KubernetesRun
    and yea - i can give that a shot
    I have a feeling this is why you wanted me to try that:
    TypeError: cannot serialize '_io.BufferedReader' object
    Kevin Kho

    Kevin Kho

    10 months ago
    Uhhh….I was expecting the same _thread.RLock lol. But either way task inputs and outputs have to be serializeable by cloudpickle. There is a workaround though. You can store your Flow as a script instead of pickle and you won’t need to serialize it.