Robert Esteves06/15/2022, 9:19 PM
Victoria Alvarez06/15/2022, 10:10 PM
Shaoyi Zhang06/15/2022, 10:22 PM
CA Lee06/15/2022, 11:48 PM
Faheem Khan06/16/2022, 2:44 AM
Faheem Khan06/16/2022, 5:56 AM
Amro06/16/2022, 6:27 AM
GuangSheng Liu06/16/2022, 7:16 AM
export PREFECT__SERVICES__SERVICES__TOWEL__MAX__SCHEDULED__RUNS__PER__FLOW=1 or export PREFECT__MAX__SCHEDULED__RUNS__PER__FLOW=1
Surya06/16/2022, 10:27 AM
Roger Webb06/16/2022, 1:30 PM
Ed Burroughes06/16/2022, 2:36 PM
def build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs): with Flow(name, **flow_kwargs) as flow: repeat_task_output = repeat_task() return flow, repeat_task_output @contextmanager def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs): flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs) try: yield flow finally: print("do something") if __name__ == "__main__": @task(log_stdout=True) def some_task(repeat_task_output): print(repeat_task_output) with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output): some_task(repeat_task_output) flow.run()
Joshua Allen06/16/2022, 2:36 PM
module for Python instead? Or is there a better way?
Matthew Millendorf06/16/2022, 3:51 PM
David Yak06/16/2022, 4:16 PM
Slackbot06/16/2022, 4:39 PM
Josh06/16/2022, 5:44 PM
David Yak06/16/2022, 5:52 PM
Fina Silva-Santisteban06/16/2022, 5:59 PM
Xavier Babu06/16/2022, 7:55 PM
David Anderson06/16/2022, 9:02 PM
seeing a new error starting ~last week? all of my runs are failing with
. haven't changed a thing in my prefect flow configuration. thought maybe it was related to an airbyte upgrade (im running the self-hosted version), but im struggling to troubleshoot. any ideas?
Error during execution of task: KeyError('schedule')
Matt Alhonte06/16/2022, 10:47 PM
tab in the GUI for a given flow always display the actual date instead of "Last Sunday" or whatever?
Matt Alhonte06/16/2022, 11:02 PM
Ben Muller06/16/2022, 11:59 PM
error trace in 🧵 Any ideas?
prefect run -p flows/harness_racing_victoria/harness_racing_victoria_results.py
George Shishorin06/17/2022, 2:05 AM
occurred. Flow B is ok. So the question is: what is the best practice to register flows with different environments and dependencies? Hope for your support and thank you!
No module named 'pandas'
Marius Haberstock06/17/2022, 8:20 AM
marque06/17/2022, 9:09 AM
Michael Maletich06/17/2022, 10:55 AM
Dung Khuc06/17/2022, 12:05 PM
if task B fails, I want to run:
TaskA: resultA -> TaskB: resultB -> TaskC: resultC
if task C fails, I want to run:
UndoTaskB (resultB) -> UndoTaskA (result A)
Michiel Verburg06/17/2022, 12:53 PM
Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought,
@task def for_loop_B(category): for item_i in category.items: load_item(item_i) store_item(item_i) @flow def for_loop_A() categories = retrieve_all_categories() for category_i in categories: for_loop_B(category_i)
should also be a flow, and
should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“. I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?