https://prefect.io logo
Title
n

Nicholas Crews

05/01/2023, 4:02 PM
Hi all! Thanks for the help. I’m coming from using pydoit and make, and looking for more of a lightweight, single-machine build system with caching than a full, cloud-based task orchestrator. I’m trying to determine if Prefect would be a good fit. [continued in thread]
There are two features that I really would like: 1. auto task dependency solving via named assets, like dagster has. For example, I annotate that task X produces an asset (eg a dataframe, ML model, some json config, etc) named “customers”, and I say that task Y requires as input some asset named “customers”. The orchestrator can link these two tasks up. 2. custom cache save/load. As I understand it, prefect uses pickle to serialize task results. I am frequently using ibis tables, and pickling doesn’t make sense for those, I would like to be able to add custom save/load hooks. How easy would it be to extend/hack prefect to support this? Could that become a builtin feature? 3. custom cache uptodate. Asset B depends on asset A. I only want to recompute asset B if it is older than asset A. I can’t think of how to express this in terms of prefect’s string-based-exact-hash-key model. Any suggestions here?
I really appreciate the help! Thank you!
n

Nate

05/01/2023, 6:50 PM
hi @Nicholas Crews - glad you're checking out prefect! 1. in prefect, you can create a data dependency between 2 tasks by passing the result of
task_x
into
task_y
as an argument, something like
@task
def get_customers() -> pd.DataFrame:
   # build df or something
   return df

@task
def transform_customers(df: pd.DataFrame) -> pd.DataFrame:
   # transform df or something

@flow
def customer_etl():
   customers = get_customers()

   transform_customers(customers)
where the prefect engine will now know that the result from
get_customers
is an upstream dependency of
transform_customers
2. depending on your use-case, you may be interested in creating your own Serializer. you can check this implementation as a reference, although I'm not sure if that would cover your desire to
to add custom save/load hooks
could you explain more what you mean by that? 3. hmm, the first thing that comes to mind is use a hash of A's result as the cache key for B - would something like that work? you could write your own cache key fn
n

Nicholas Crews

05/02/2023, 3:35 AM
1. I was hoping to do this declaratively, outside of a flow like dagster. Sounds like not built in, but could maybe hack something together.
2. ser/deser looks almost right, but does not quite cover what I need, because I need to also have control over the actual “writing to disk” part. The stored data won’t end up in the prefect metadata database. I need something like
def save(table): ibis.expr.types.Table):
    conn = ibis.connect("mydb.duckdb")
    table_name = prefect.runtime.task_run.name
    conn.create_table(table_name, table)
3. Hmmm I don’t think so, but maybe I don’t quite know all the options available to me. The ibis Tables that I am passing around a) have no builtin hash method so it’s not easy and b) represent gigabytes of data so I’d love (need to?) to avoid actually hashing the whole thing. I would rather have the the staleness determined by “last modified time”, separate metadata from the actual contents. And comparing hashes by equality doesn’t work, because as long as asset B was written anytime after asset A, we don’t need to recompute B.