https://prefect.io logo
Title
b

Bryan Whiting

02/03/2023, 5:42 PM
I have a three-step workflow where it takes me 10 min to build an xgboost model and then I do simulations that also take 10 min. Then I make plots after. Each step saves out artifacts, which load into the next step. I’m currently using
click
and doing some logic to manage the “start from X” step. What’s the “prefect” way of thinking about “okay, i’ve already run the xgboost step today, just run from simulations on.” What’s the prefect way to say in a flow, “start from step 3 and do all downstream steps”?
1
seems like https://docs.prefect.io/concepts/tasks/#caching is the idea i’m looking for?
I’m more asking “how do I start the flow after step 3”?
chatGPT gave me this:
import prefect

# define your pipeline
flow = prefect.Flow("My Flow")

# add tasks to your pipeline
task_1 = prefect.Task(name="Task 1")
task_2 = prefect.Task(name="Task 2")
task_3 = prefect.Task(name="Task 3")

flow.add_task(task_1)
flow.add_task(task_2)
flow.add_task(task_3)

# start the flow on the third task
flow.run(tasks=[task_3])
but I want to manage this with a CLI like
python -m my_script.py --task3
j

jcozar

02/03/2023, 5:50 PM
Hi @Bryan Whiting! I already had the same problem. What I did is to use some parametrization at flow level (boolean for each task) and a
if-else
for each task. I recommend using cache for real caching purposes, otherwise it might have unwanted behaviours.
But if you have data dependencies between tasks, then you should use (or combine) cache's
b

Bryan Whiting

02/03/2023, 5:52 PM
thanks @jcozar! can the UI be used to just run from a particular step?
i think airflow has this, right?
boolean for each task
is what i was thinking, thanks for confirming
j

jcozar

02/03/2023, 5:53 PM
Since you parametrize the flow, you can use the UI to set the parameter values as you want
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE


@task
def log_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", name)
    <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
    logger.debug(AN_IMPORTED_MESSAGE)


@flow()
def log_flow(name: str):
    log_task(name)


if __name__ == "__main__":
    name = sys.argv[1]
    log_flow(name)
j

jcozar

02/03/2023, 5:54 PM
But keep in mind what I said about dependencies. If tasks depends on other tasks results, you should use caches and probably a custom cache_key_fn
b

Bryan Whiting

02/03/2023, 5:54 PM
or what do you mean by “parameteriz´flow”?
yea, i won’t use cache
thanks for the heads up. i want the artifcats written out
j

jcozar

02/03/2023, 5:55 PM
Yeah, in prefect 2.0 flow parametrization is just using parameters for flow function
👍 1
j

Jeff Hale

02/04/2023, 4:06 AM
I'll also just note that we have some additional functionality around artifacts being worked on now. 🙂
e

Evan Curtin

02/06/2023, 7:12 PM
Are you talking about
Makefile
style
target=task4
which would ignore steps in the DAG that don’t contribute to the target task (either because they are downstream or completely unrelated) ?
b

Bryan Whiting

02/16/2023, 10:01 PM
@Evan Curtin yes. Similar to Luigi → Skip step A, B, run step C through D.
e

Evan Curtin

02/16/2023, 10:03 PM
Yeah normally I have tasks early exit if the output exists. It used to be trivial in prefect 1 to do this with the Result type put prefect 2 made that extremely awkward. I wrote a custom decorator to persist task outputs that skips the task if the output already exists
b

Bryan Whiting

02/16/2023, 10:05 PM
okay, so you don’t use caching but have your own artifact management wrapper that checks if the artifact exists for that function, and if it does, you move on? How do you save the artifact? (csv, pickle, etc.)
e

Evan Curtin

02/16/2023, 10:06 PM
essentially yes. Most of my artifacts are persisted using spark I/O methods (i.e. parquet / delta lake)
You can kinda hack it with a custom serializer and result types but it just gets in the way IMO
b

Bryan Whiting

02/16/2023, 10:07 PM
yea i need my functions to run both locally and on GCP so trying to decide either Prefect or Luigi
@Jeff Hale have any suggestions? What artifact functionality are you creating?
e

Evan Curtin

02/16/2023, 10:09 PM
the old result type had an interface that was something like
class Result:
   def __init__(self, uri):
        self.uri =  uri

   def exists(self) -> bool:
       ...
   def save(self):

   def load(self):
which is basically all you need. The
Serializer
abstraction in 2.0 really makes it complicated when you’re not dealing with something that can be converted to bytes in memory. I also prefer using the output state of the world as my source of truth, rather than some nebulous cache state in prefect db
1
j

Jeff Hale

02/16/2023, 10:10 PM
lol. Just shared this thread with our product management team so I’ll let them chime in.
e

Evan Curtin

02/16/2023, 10:10 PM
@Bryan Whiting did we work together at that large CC company? 🙂
b

Bryan Whiting

02/16/2023, 10:11 PM
ha! we did!
i was there 2017-2020. just checked your linkein. seems like we overlapped. Well, i guess we’re both still coding like we did back then 🙂
e

Evan Curtin

02/16/2023, 10:12 PM
haha yes a lot of my workflow authoring comes from that work. This concept in particular for me is one of the most important as it helps the workflow be efficient and extremely reliable
1
b

Bryan Whiting

02/16/2023, 10:14 PM
Agreed. Nothing’s more important to me than loading in the data from step 5 to debug.
e

Evan Curtin

02/16/2023, 10:14 PM
@dataclass
class RepositoryCache:
    schema: Schema
    partitions: Optional[list[str]] = None

    @staticmethod
    def get_root() -> str:
        return str(get_flow_param("root"))

    @property
    def repo(self) -> Repository:
        return Repository(self.get_root())

    @staticmethod
    def get_dataset() -> str:
        return str(get_flow_param("dataset"))

    @property
    def dataset(self) -> str:
        # for backwards compat should remove this prop
        return self.get_dataset()

    def exists(self, partition: dict[str, Any]) -> bool:
        return self.repo.exists(self.schema, self.dataset, partitions=partition)

    def __call__(self, f: Callable) -> Callable:
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Get the function's parameters as a dictionary of param: value
            # this way we can check the data for presence of this partition
            # regardless of if it was arg or kwarg
            params: dict[str, Any] = signature(f).bind(*args, **kwargs).arguments
            partition_dict = {k: params[k] for k in self.partitions or []}
            if self.exists(partition=partition_dict):
                <http://logger.info|logger.info>(f"Loading from cache: {self.repo.root} | {self.dataset} | {self.schema.name}")
                return self.repo.load(self.schema, self.dataset)

            <http://logger.info|logger.info>("Not found, computing")
            result = f(*args, **kwargs)
            <http://logger.info|logger.info>(f"Saving to cache {self.repo.root} | {self.dataset} | {self.schema.name}")
            self.repo.save(result, self.schema, self.dataset)
            return result

        return wrapped
well idk if this helps but this is a way to implement the repository pattern and use it as a task cache decorator
the partition thing maybe is best done another way but you can ignore that
b

Bryan Whiting

02/16/2023, 10:22 PM
@Evan Curtin what was the main reason you chose prefect over other systems? I hate airflow because of the db/backfill notation. Luigi seems too verbose when I could just use tasks and maps (but handles state how I like it). Then there’s prefect, which has most of what I’m looking for but I’d have to use your function above (not opposed to it, just curious). If you have any tips on whether prefect is still worth it to you after being deep in the weeds, i’m all ears.
e

Evan Curtin

02/16/2023, 10:24 PM
because our company paid for the managed version, basically. I’d suggest looking at dagster
😆 1
b

Bryan Whiting

02/16/2023, 10:25 PM
well idk if this helps but this is a way to implement the repository pattern and use it as a task cache decorator
This is really nice! Thanks for sharing. So do you just do
@RepositoryCache(schema='', partitions=4)
@task
def function():
?
lol my coworker was just bashing dagster an hour ago for reasons i’ve already forgotten. everyone’s got their thing. will look into it!
e

Evan Curtin

02/16/2023, 10:41 PM
yeah i never gave it a fair shake. I spent 2 years writing Argo workflows but writing flows in YAML is kinda a bad idea
b

Bryan Whiting

02/16/2023, 10:42 PM
dagster seems really nice. doing the tutorial rn
e

Evan Curtin

02/16/2023, 10:42 PM
in my repository impl I would say something like
@task
@RepositoryCache(schema=model_score_schema, partitions="model_version")
def score_model(model_vesrion)
   ...
👍 1
ima logoff for the night, always ready to chat about this
so i will check tomorrow
b

Bryan Whiting

02/16/2023, 10:43 PM
thanks, huge help! appreciate it
j

justabill

02/16/2023, 10:46 PM
@Evan Curtin Thanks for the feedback about serializers. We don't get that feedback often but I totally see what you mean. As you said, the Prefect 1 result class had simple interfaces for result types, at the expense of being less general. Regarding the forthcoming results and artifacts enhancements to Prefect 2. We'll give you the ability to specify caching rules that prevent a task from unnecessarily running if certain data already exists and is not “too stale”. Additionally, you'll be able to see when and by what task a result was used via cache. We'll have more to share in a few weeks.
b

Bryan Whiting

02/16/2023, 10:48 PM
what does “too stale” mean? what if I want to just re-run the pipeline right now but only step 5? dagster seems to make it easy to click on a task and re-run from that task
j

justabill

02/16/2023, 10:57 PM
"too stale" just means that the cache has expired according to the rules that have been specified. If you want to re-run a flow from a specific step onwards, you'll be able to do that if the results are persisted.
b

Bryan Whiting

02/16/2023, 10:58 PM
do you have an example on how to do that? I’m happy to write to disk in each of my functions. Just can’t figure out how to trigger a run from a particular step in the pipeline.
e

Evan Curtin

02/17/2023, 7:22 PM
I’m not sure prefect has the ability to sort of backwards-crawl the result graph and only run necessary tasks in the style of make
you can probably dynamically parametrize your flow to early exit. That with caching should get you close
j

justabill

02/17/2023, 8:53 PM
Thats correct, Prefect cannot do that right now. The closest that we come to that ability is retrying a flow - which will used the cached results from completed task runs. We're exploring rerunning from a particular task onwards as a future feature.
b

Bryan Whiting

02/17/2023, 8:55 PM
thanks all, that’s all I need prefect to do right now so i’ll have to either use Luigi or Dagster. appreciate the insight
@Evan Curtin yea that’s what i’m looking for, thanks for you guidance! really appreciate it
e

Evan Curtin

02/17/2023, 9:23 PM
I may or may not have also simply commented out the portions of the worklfow i wanna skip 😉
b

Bryan Whiting

02/17/2023, 9:24 PM
haha when all else fails…