Hi - I have a flow that retrieves a list of keys f...
# ask-community
k
Hi - I have a flow that retrieves a list of keys from S3. Each key represents a .zip file which is an archive that when unzipped contains 5 different csv files. I am trying to map over those keys and write each file within the archive to Azure Storage. I am having issues understanding how to handle the zipped object - which at times should be unmapped. The working code I have right now only writes out the data associated with the last file in the list
Copy code
with Flow("s3-ingest-azure-load") as flow:
    s3_keys = ven_next_keys(prefix='prefix') # [list, of, keys]
    s3_obj = ven_next_dl.map(key=s3_keys, as_bytes=unmapped(True)) # [list, of, objs]
    zipped_object = convert_to_zip.map(s3_obj) #[objs, as, zip]
    files = create_list_of_files.map(zipped_object) # [[list],[of],[zipInfoObjects]]
    file_name = create_file_name.map(zip_file=flatten(files), upstream_tasks=[unmapped(zipped_object)]) # [list, of, filenames]
    file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files)) # [list, of, data] but currently only includes the data associated with the last file
    blob_name = azure_upload.map(data=file_data, blob_name=file_name, overwrite=unmapped(True))
k
Hey @Kevin, do you mean the last file is mapped N times or the mapped task only runs 1 time?
k
The azure_upload.map() task only runs once...
Copy code
[2021-10-25 16:27:23+0000] INFO - prefect.TaskRunner | Task 'BlobStorageUploadOverwrite[0]': Finished task run for task with final state: 'Success'
[2021-10-25 16:27:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
Are all of these maps the same length?
k
no... thats what i have had a tough time thinking through
i have mapped objects that represent the s3 objects (the zipped file - example.zip).
and then i have mapped objects that represent the files within the zipped file (example1.csv, example2.csv, etc.)
so its a 1:many
i was trying to make it dynamic enough so that i dont have to assume that i will only download 1 s3 object/zip file per flow execution
i'm thinking maybe i should create two distinct flows. one for downloading the s3 object and the other for extrating the files out of that object
k
I’ll give this a shot in a bit
k
Sounds good
k
I am a bit confused here”
file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files))
Are those lengths equal for the map?
k
no... zipped_object = a list of zip archives downloaded from s3. and files is a list of files within a single one of those archives
so the length of zipped_object would equal the number of objects we download from s3
and the length of files would be equal to the number of files within one of those objects from s3
k
I think these need to be equal somehow. This is the line where things go wrong right?
k
yes
i guess i could try and copy the zipped object to be equal to the number of files within it
k
Could you try an intermediate task to reshape the inputs to make them equal? I’m working on a flow with similar structure. I just got kinda stuck here 😅
k
do you think that would be better than creating two distinct flows?
one flow to download the s3 object and convert it to a ZipObject... the second flow would accept the ZipObject as a parameter
that way i could keep it unmapped as i work through the list of files within it
k
Yeah I think it can be done in one flow
k
i ended up just creating the equal mapping length - it worked!
funny work around but thats oaky
k
I think when you map, the calls need to be equal length because each of those pairwise elements is submitted as a task so you’ll run into problems if there is no pair.
k
Hey @Kevin Kho - revisiting this thread as i've run into an issue. I have a flow that works when I run it locally but runs into this error when i deploy and run it via prefect cloud on my kubernetes cluster:
Copy code
Unexpected error: TypeError("can't pickle _thread.RLock objects")
the task this fails on is a task that receives the downloaded bytes from S3 and uses them to initialize and return a ZipFile object
having a really tough time troubleshooting/triaging
Copy code
@task 
def convert_to_zip(s3_obj):
    zipped_file = ZipFile(BytesIO(s3_obj), 'r')
    return zipped_file
k
can you try manually using
cloudpickle
on the
ZipFile
object?
Copy code
import cloudpickle
cloudpickle.dumps(ZipFile)
Or the s3_obj
Are you using Dask also?
k
i'm just using KubernetesRun
and yea - i can give that a shot
I have a feeling this is why you wanted me to try that:
Copy code
TypeError: cannot serialize '_io.BufferedReader' object
k
Uhhh….I was expecting the same _thread.RLock lol. But either way task inputs and outputs have to be serializeable by cloudpickle. There is a workaround though. You can store your Flow as a script instead of pickle and you won’t need to serialize it.