Max Lei

05/31/2020, 11:08 PM
Hi all, is there a recommended approach for objects that cannot be pickled? For example in a machine learning pipeline where you need to do some ETL. Some of the steps require tools that have objects that cannot be pickled so I imagine I might put all those items in one step (SFrames, Spark, etc). Then pipe the results to a CSV file, and the next step we can train a model using the file location of the CSV file, but then I may want one more step to create metrics and send the metrics to MLFlow, but it may be possible that the model cannot be pickled easily, is prefect the wrong tool for this job? But I also would like a cronjob and a UI to look at the results. It seems that I may need to engineer the problem around Prefect vs a single script and Linux crontab. Any suggestions?


06/01/2020, 12:10 AM
That’s a good question @Max Lei, and all of your approaches could be viable. If your steps all require passing around an object that can’t easily be checkpointed (by pickling or otherwise), then your “unit of work” may in fact be a single, large task - that’s totally fine in Prefect. Many of our users are taking advantage of Prefect schedulers and UI history while kicking off a single-task representing an external script. If instead you can find a way to serialize / deserialize those objects between tasks, and that better reflects your units of work, then you could (for example),
the parameters needed to create the unpickleable object from the upstream task, then use those parameters as the inputs to the next task, which recreates the object. You could also use the new
mechanism to transparently write outputs to a filesystem. At the end of the day, tasks must be able to be run independently (in the worst-case event of a node failure, for example), so they need to be able to serialize or recreate their inputs/environment. We recommend making your tasks as small as possible, for greater observability, but not so small that you need to reengineer your work.

Max Lei

06/01/2020, 5:27 AM
@Jeremiah Ok, understand, thanks for your help! Then is there a usage for ResultHandler if one can use spark? It it possible to stream from ResultHandler for read and write so that one can store larger than memory objects like CSVs?

Avi A

06/01/2020, 6:43 AM
Regarding the non-picklable objects, that’s actually the way I work. I put the data inside a partition on GCS, and read it in the subsequent tasks. There’s a related issue on Github, you might want to add your thoughts Regarding the streaming you could probably use the same solution, but maybe @Jeremiah has a better idea.

Steve Taylor

06/01/2020, 12:37 PM
At the place I'm at right now, we're slightly, um, behind in infrastructure... so we do this out of necessity. We have a library of custom-made Tasks, to support a loose polling mechanism. I poll every N minutes for a parquet file (lose the CSV!). If it exists, carry on, else skip it all and end. Imagine a chain of these where one process posts a file, and subsequent decoupled flows poll for anything new. It's a little -- a lot -- heavy, and the paradigm using ifelse is being reviewed by some really smart people. e.g., @Jim Crist-Harif In BPMN the token would just end, but in Prefect, the non-making ifelse branch will just skip. Makes for heavy logs, but does in fact do the trick. We've played with using queuing -- for whatever reason we don't have k8s, but do have every-known-queuing-solution-ever-made. <shrug/> But in the end we have to poll/subscribe to that... so we've continued to refine the polling-parquet-post-parquet solution where we can't pickle. Aside, I imagine someone might look at that and say -- have you heard of NiFi? I'm told that's coming someday... but even so, we'd still have to poll for events and task triggers. Finally, re crontab. Yup. We use either a Jenkins to schedule flows directly -- it might be an anti-pattern, but it's extremely visible if things go wrong -- or crontab. And Ansible. Jenkins to update crontab using Ansible.