Is it possible for the S3Upload feature to return ...
# ask-community
c
Is it possible for the S3Upload feature to return an s3 path to an unpickled file?
k
Hi @Charles Liu! Doesn’t the Task return the key already?
c
@Kevin Kho Hi! let me provide a little more context: we're using a "Stage" task step because we're feeding an s3_path into an existing pipeline to trigger downstream tasks.
At the moment, our Stage task served to generate the s3 path, because it takes the dataframe generated in our Transform task step, writes it to csv, and tosses it over to a bucket that I set up.
I was looking for a Prefect feature that would do this for me, but it seems that S3Upload will only upload/download stringified information and S3Result uses a pickled file (which will require further changes to our pipeline); obviously either one would cause a collision from a downstream pipeline expecting a standardized delimited csv. Am I understanding these features correctly?
k
I see yes it looks like your understanding is right. Is the issue that you don’t know the file path you’ll save it in ahead of time, or do you know that?
Yes the standard way to do this at the moment is save to a location and pass the saved location to downstream tasks to ingest from.
c
So I've actually able to assemble the path because I know where it needs to go and since we built our own file creation task, we have a spare bucket for that as well. In that instance I run into the issue where the file is pickled and that is beyond the scope I was provided (we can add pickle handling later, if that's how it has to go).
k
Is your dataframe dask or pandas?
c
Just pandas! At the moment our own Stage task writes a normal pandas dataframe to an uncompressed/unpickled csv.
It writes it to S3 and returns the URI
k
Got it. Just a suggestion but think you may have an easier experience using
dask.distributed read_csv
, combined with
s3fs
. To load/save on top of S3
c
Presumably that requires switching over to dask as our dataframe handler correct?
yes but Dask uses Pandas. you can convert a Dask dataframe (and vice-versa) to Pandas with no problem
I assume it’s not that big? Since it still fits into Pandas
df.compute()
converts a Dask DataFrame to Pandas
🙌 1
c
Yeah not super huge (yet), and moving forward we may have to move from pandas to dask like you said re: size
all compatibility considered*
k
Are you using CSVs or Parquet?
c
Current downstream task expects an s3_path to a regular csv, nothing special.
k
Actually pandas supports s3fs so with s3fs you can do this: https://stackoverflow.com/a/43838676
I guess no need for Dask yet 👍
1
e
I don't know if it helps your use case, but here's a prefect feature I just discovered:
Copy code
pandasresult = LocalResult(
        dir="./qparse_results",
        location="{flow_id}_{task_full_name}_{flow_run_name}.csv",
        serializer=PandasSerializer(file_type="csv"))
So turns out result objects can specify custom serializers, and pandas->csv serializer is supported out of the box. Although I didn't use it with an
S3Result
, it has been a real treat while developing. https://github.com/PrefectHQ/prefect/blob/e3b43402ac5e43c7ad4297ef36ef800360c7391b/src/prefect/engine/serializers.py#L153
👍 1
🙌 1
k
Nice suggestion @emre!
e
Thanks! Admittedly, I don't know how easy or hard it is to pass result locations to downstream tasks
c
Surprisingly easily! In a with Flow(): loop, you can just move things in variables local to the flow itself and pass them into the next task or tasks.
@emre thanks for this huge find I'm going to take a look!
👀 1