Kevin
10/25/2021, 4:20 PMKevin
10/25/2021, 4:20 PMwith Flow("s3-ingest-azure-load") as flow:
s3_keys = ven_next_keys(prefix='prefix') # [list, of, keys]
s3_obj = ven_next_dl.map(key=s3_keys, as_bytes=unmapped(True)) # [list, of, objs]
zipped_object = convert_to_zip.map(s3_obj) #[objs, as, zip]
files = create_list_of_files.map(zipped_object) # [[list],[of],[zipInfoObjects]]
file_name = create_file_name.map(zip_file=flatten(files), upstream_tasks=[unmapped(zipped_object)]) # [list, of, filenames]
file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files)) # [list, of, data] but currently only includes the data associated with the last file
blob_name = azure_upload.map(data=file_data, blob_name=file_name, overwrite=unmapped(True))
Kevin Kho
Kevin
10/25/2021, 4:27 PMKevin
10/25/2021, 4:27 PM[2021-10-25 16:27:23+0000] INFO - prefect.TaskRunner | Task 'BlobStorageUploadOverwrite[0]': Finished task run for task with final state: 'Success'
[2021-10-25 16:27:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Kevin
10/25/2021, 4:31 PMKevin Kho
Kevin
10/25/2021, 4:33 PMKevin
10/25/2021, 4:33 PMKevin
10/25/2021, 4:34 PMKevin
10/25/2021, 4:34 PMKevin
10/25/2021, 4:34 PMKevin
10/25/2021, 4:37 PMKevin Kho
Kevin
10/25/2021, 4:44 PMKevin Kho
file_data = extract_file_data.map(zip_archive=zipped_object, zip_file=flatten(files))
Kevin Kho
Kevin
10/25/2021, 4:57 PMKevin
10/25/2021, 4:58 PMKevin
10/25/2021, 4:58 PMKevin Kho
Kevin
10/25/2021, 4:59 PMKevin
10/25/2021, 4:59 PMKevin Kho
Kevin
10/25/2021, 4:59 PMKevin
10/25/2021, 5:00 PMKevin
10/25/2021, 5:00 PMKevin Kho
Kevin
10/25/2021, 5:30 PMKevin
10/25/2021, 5:31 PMKevin Kho
Kevin
11/09/2021, 3:24 PMUnexpected error: TypeError("can't pickle _thread.RLock objects")
Kevin
11/09/2021, 3:25 PMKevin
11/09/2021, 3:25 PMKevin
11/09/2021, 3:27 PM@task
def convert_to_zip(s3_obj):
zipped_file = ZipFile(BytesIO(s3_obj), 'r')
return zipped_file
Kevin Kho
cloudpickle
on the ZipFile
object?
import cloudpickle
cloudpickle.dumps(ZipFile)
Kevin Kho
Kevin Kho
Kevin
11/09/2021, 3:29 PMKevin
11/09/2021, 3:30 PMKevin
11/09/2021, 3:47 PMTypeError: cannot serialize '_io.BufferedReader' object
Kevin Kho