I’m wondering if and how prefect could be used to ...
# prefect-community
m
I’m wondering if and how prefect could be used to transfer large amounts of data between different servers/clouds. Basically, I’d need to move 10-100 gigabyte csv/Jason files from an SFTP server to ADLS, and later on between other sources and sinks. Moving this amount of data as a one gigantic in memory string between tasks does not seem very sound approach for many reasons. So how would you actually do that?
Moreover, I probably need to transform the csv file to parquet (or something similar) format before storing it in ADLS
j
Hi Matias, you’re right, moving that much data in memory is always challenging. Especially if you need to convert to parquet, it sounds like this could be a good fit for a Spark job or Dask, which can both work with partitioned data. It’s been a while since I had to load anything from SFTP so I confess I don’t remember if it’s possible to read that remote data in chunks (I suspect not), but at least a cluster would give you adequate RAM. In that case, you could use Prefect to schedule, launch, monitor, and clean up the Spark job.
If you can access the remote data in chunks, then you could run Dask on a relatively small cluster (as in, less RAM than the amount of data), as it excels at out-of-core data manipulation.
j
Hi @Matias, just to add to what Jeremiah mentioned, we've had a good experience with an open source tool to transfer large data sets called Embulk: https://github.com/embulk/embulk From their github page: "Embulk is a parallel bulk data loader that helps data transfer between various storages, databases, NoSQL and cloud services. Embulk supports plugins to add functions. You can share the plugins to keep your custom scripts readable, maintainable, and reusable."
It's a command line tool that's easy to configure and uses a plugin model to support lots of input and output systems, including SFTP. I'm not very familiar with Azure, but it looks like there's an Embulk output plugin for Azure Blob Storage: https://github.com/embulk/embulk-output-azure_blob_storage
Embulk is a JVM-based tool so you'll need to have a Java JRE/JDK installed in whatever environment/Docker image you use with Prefect. Here's a snippet from a Dockerfile of ours to install embulk and plugins:
Copy code
RUN apt-get update \
  && apt-get install -yq --no-install-recommends openjdk-8-jre
RUN curl --create-dirs -o ~/.embulk/bin/embulk -L "<https://dl.embulk.org/embulk-latest.jar>" \
  && chmod +x ~/.embulk/bin/embulk \
  && echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc \
  && ~/.embulk/bin/embulk gem install embulk-input-sftp \
  && ~/.embulk/bin/embulk gem install embulk-input-mysql \
  && ~/.embulk/bin/embulk gem install embulk-input-redshift \
  && ~/.embulk/bin/embulk gem install embulk-output-mysql \
  && ~/.embulk/bin/embulk gem install embulk-output-redshift
We're planning to eventually create a Prefect task for Embulk , but we haven't gotten around to it yet. (Once we do, we'll be happy to contribute it to the library of Prefect tasks.) For now, you can use ShellTask to call the Embulk command line from a Prefect task.
And lastly, we see Prefect as very appropriate to orchestrate large data transfers and all aspects of data pipelines. We are long-time users of Airflow (almost 4 years) and are in the process of adopting Prefect, especially for data science workflows that manipulate very large data sets. (For some tasks like bulk data transfers you might want to run that on a Dask worker with substantial resources - lots of RAM and CPU cores - but Prefect plus dask-kubernetes makes that possible with task tagging and Dask worker resources.)
m
That’s interesting, thank you. Would you wire embulk somehow to Prefect to make them work together?
j
We are planning to have Prefect tasks that execute an embulk command, e.g. transfer a file from SFTP to Amazon Redshift.
One nice thing about embulk is that you can easily test by running it from command line interactively, get that working, and then take the yaml file that you created during command line testing and have embulk use that in your production Flow, e.g. by using Prefect's ShellTask to execute something like
embulk run sftp_to_redshift_dataset1.yaml
j
Thanks for the comments @Joe Schmid, very helpful — I wasn’t familiar with Embulk
👍 2
a
Thanks @Joe Schmid. We've been using singer taps and targets for a similar workflow--running from command line to get the connection right and then automating using prefect
Singer is supposed to stream data from source to target, but I haven't tested it with a lot of data yet
j
Thanks @Adam Roderick I'd be curious to hear what your early experience with Singer has been like and what you'll use it for. I like that Singer is python-based and conceptually it looks a lot like Embulk, i.e. plugins/taps, etc. (Feel free to DM if you prefer.)
a
@Joe Schmid It's been great for a few reasons. I wanted flexibility and a community that contributes to plugins. We've been able to do local development in postgres and production development in redshift, all with just configuration changes--the consistency of the tap + target pattern is very nice. We've also found writing taps to be pretty straightforward and have written one and forked a couple of others. The streaming aspect is nice, although it looks like it depends on the implementation. For example, the redshift target batches because of the nature of how redshift loads data from S3. Not quite streaming, but the batch configuration settings get you some of the benefits. We create data platforms for our customers, and I wanted a repeatable way to do the ELT pattern, and singer does extract and load quite nicely.
s
Another vote here for Singer
j
Thanks @Adam Roderick & @Sergi de Pablos for the comments on Singer -- much appreciated.