Bryan Whiting
02/03/2023, 5:42 PMclick
and doing some logic to manage the “start from X” step. What’s the “prefect” way of thinking about “okay, i’ve already run the xgboost step today, just run from simulations on.”
What’s the prefect way to say in a flow, “start from step 3 and do all downstream steps”?import prefect
# define your pipeline
flow = prefect.Flow("My Flow")
# add tasks to your pipeline
task_1 = prefect.Task(name="Task 1")
task_2 = prefect.Task(name="Task 2")
task_3 = prefect.Task(name="Task 3")
flow.add_task(task_1)
flow.add_task(task_2)
flow.add_task(task_3)
# start the flow on the third task
flow.run(tasks=[task_3])
python -m my_script.py --task3
jcozar
02/03/2023, 5:50 PMif-else
for each task.
I recommend using cache for real caching purposes, otherwise it might have unwanted behaviours.Bryan Whiting
02/03/2023, 5:52 PMboolean for each task
is what i was thinking, thanks for confirmingjcozar
02/03/2023, 5:53 PMBryan Whiting
02/03/2023, 5:54 PMimport sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE
@task
def log_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", name)
<http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
logger.debug(AN_IMPORTED_MESSAGE)
@flow()
def log_flow(name: str):
log_task(name)
if __name__ == "__main__":
name = sys.argv[1]
log_flow(name)
jcozar
02/03/2023, 5:54 PMBryan Whiting
02/03/2023, 5:54 PMjcozar
02/03/2023, 5:55 PMJeff Hale
02/04/2023, 4:06 AMEvan Curtin
02/06/2023, 7:12 PMMakefile
style target=task4
which would ignore steps in the DAG that don’t contribute to the target task (either because they are downstream or completely unrelated) ?Bryan Whiting
02/16/2023, 10:01 PMEvan Curtin
02/16/2023, 10:03 PMBryan Whiting
02/16/2023, 10:05 PMEvan Curtin
02/16/2023, 10:06 PMBryan Whiting
02/16/2023, 10:07 PMEvan Curtin
02/16/2023, 10:09 PMclass Result:
def __init__(self, uri):
self.uri = uri
def exists(self) -> bool:
...
def save(self):
def load(self):
which is basically all you need. The Serializer
abstraction in 2.0 really makes it complicated when you’re not dealing with something that can be converted to bytes in memory. I also prefer using the output state of the world as my source of truth, rather than some nebulous cache state in prefect dbJeff Hale
02/16/2023, 10:10 PMEvan Curtin
02/16/2023, 10:10 PMBryan Whiting
02/16/2023, 10:11 PMEvan Curtin
02/16/2023, 10:12 PMBryan Whiting
02/16/2023, 10:14 PMEvan Curtin
02/16/2023, 10:14 PM@dataclass
class RepositoryCache:
schema: Schema
partitions: Optional[list[str]] = None
@staticmethod
def get_root() -> str:
return str(get_flow_param("root"))
@property
def repo(self) -> Repository:
return Repository(self.get_root())
@staticmethod
def get_dataset() -> str:
return str(get_flow_param("dataset"))
@property
def dataset(self) -> str:
# for backwards compat should remove this prop
return self.get_dataset()
def exists(self, partition: dict[str, Any]) -> bool:
return self.repo.exists(self.schema, self.dataset, partitions=partition)
def __call__(self, f: Callable) -> Callable:
@wraps(f)
def wrapped(*args, **kwargs):
# Get the function's parameters as a dictionary of param: value
# this way we can check the data for presence of this partition
# regardless of if it was arg or kwarg
params: dict[str, Any] = signature(f).bind(*args, **kwargs).arguments
partition_dict = {k: params[k] for k in self.partitions or []}
if self.exists(partition=partition_dict):
<http://logger.info|logger.info>(f"Loading from cache: {self.repo.root} | {self.dataset} | {self.schema.name}")
return self.repo.load(self.schema, self.dataset)
<http://logger.info|logger.info>("Not found, computing")
result = f(*args, **kwargs)
<http://logger.info|logger.info>(f"Saving to cache {self.repo.root} | {self.dataset} | {self.schema.name}")
self.repo.save(result, self.schema, self.dataset)
return result
return wrapped
well idk if this helps but this is a way to implement the repository pattern and use it as a task cache decoratorBryan Whiting
02/16/2023, 10:22 PMEvan Curtin
02/16/2023, 10:24 PMBryan Whiting
02/16/2023, 10:25 PMwell idk if this helps but this is a way to implement the repository pattern and use it as a task cache decoratorThis is really nice! Thanks for sharing. So do you just do
@RepositoryCache(schema='', partitions=4)
@task
def function():
?Evan Curtin
02/16/2023, 10:41 PMBryan Whiting
02/16/2023, 10:42 PMEvan Curtin
02/16/2023, 10:42 PM@task
@RepositoryCache(schema=model_score_schema, partitions="model_version")
def score_model(model_vesrion)
...
Bryan Whiting
02/16/2023, 10:43 PMjustabill
02/16/2023, 10:46 PMBryan Whiting
02/16/2023, 10:48 PMjustabill
02/16/2023, 10:57 PMBryan Whiting
02/16/2023, 10:58 PMEvan Curtin
02/17/2023, 7:22 PMjustabill
02/17/2023, 8:53 PMBryan Whiting
02/17/2023, 8:55 PMEvan Curtin
02/17/2023, 9:23 PMBryan Whiting
02/17/2023, 9:24 PM