Derek Heyman
11/06/2023, 8:33 PMJustin Trautmann
11/07/2023, 9:46 AMDerek Heyman
11/07/2023, 1:12 PMDerek Heyman
11/07/2023, 1:13 PMJustin Trautmann
11/07/2023, 1:17 PMDerek Heyman
11/07/2023, 1:17 PMDerek Heyman
11/14/2023, 5:29 PMDerek Heyman
11/14/2023, 5:30 PMJustin Trautmann
11/15/2023, 1:25 PMfrom prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def this_task_succeeds(x=None):
return "foo"
@task(cache_key_fn=task_input_hash)
def this_task_fails(x=None):
raise Exception
@flow()
def main():
this_task_fails.submit(this_task_succeeds.submit("foo"))
this_task_fails.submit(this_task_succeeds.submit("bar"))
if __name__ == "__main__":
main()
Derek Heyman
11/15/2023, 2:26 PMDerek Heyman
11/15/2023, 3:29 PMfrom prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def reused_task(x=None, param1, param2):
pass
@flow()
def main():
reused_task.submit("foo")
reused_task.submit("bar")
if __name__ == "__main__":
main()
Derek Heyman
11/15/2023, 3:30 PMDerek Heyman
11/15/2023, 3:31 PMDerek Heyman
11/15/2023, 3:32 PMDerek Heyman
11/15/2023, 3:33 PMJustin Trautmann
11/15/2023, 3:34 PMDerek Heyman
11/15/2023, 3:34 PMJustin Trautmann
11/15/2023, 4:07 PMDerek Heyman
11/15/2023, 4:09 PMJustin Trautmann
11/15/2023, 4:11 PMfrom prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def reused_task(x, param1, param2):
pass
@flow()
def main():
reused_task.submit("foo", "x", "y")
reused_task.submit("bar", "p", "q")
if __name__ == "__main__":
main()
Derek Heyman
11/15/2023, 4:14 PMexpected_sources = read_company_config(s3_bucket, s3_path_prefix)
ingestion_tasks = {source: run_module_on_emr_serverless_task.submit(source, spe_config, wait_for=[expected_sources]) for source in expected_sources}
This is how it is calledDerek Heyman
11/15/2023, 4:15 PMDerek Heyman
11/15/2023, 4:15 PM