Ritvik Mishra
10/20/2021, 10:00 PMKevin Kho
10/20/2021, 10:02 PMRitvik Mishra
10/20/2021, 10:08 PMKevin Kho
10/20/2021, 10:09 PMRitvik Mishra
10/20/2021, 10:11 PMKevin Kho
10/20/2021, 10:13 PMRitvik Mishra
10/20/2021, 10:18 PMKevin Kho
10/20/2021, 10:19 PMDatabricksRunNow().map()
to parallelize?Ritvik Mishra
10/20/2021, 10:21 PMKevin Kho
10/20/2021, 10:23 PMdatabricks_task = DatabricksRunNow()
with Flow() as flow..
databricks_task(...)
databricks_task(...)
Is that right?Ritvik Mishra
10/20/2021, 10:25 PMwith Flow('..') as flow:
flow.storage = GitHub(
repo="...",
path="...",
access_token_secret="..."
)
RunNowTask = DatabricksRunNow(databricks_conn_secret=json.loads(os.environ['<some auth related stuff>']))
RunNowTask.copy(
name=name,
job_id=job_id,
max_retries=max_retries,
retry_delay=retry_delay,
notebook_params=notebook_params,
)
RunNowTask()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
Kevin Kho
10/20/2021, 10:28 PMself
and calling run()
edited self
. This means that invoking it multiple times and calling the run
inside the Flow
was causing the self
to be altered and lost info of the previous calls.Ritvik Mishra
10/20/2021, 10:31 PMKevin Kho
10/20/2021, 10:31 PMRitvik Mishra
10/27/2021, 4:42 PMKevin Kho
10/27/2021, 5:15 PMRitvik Mishra
10/27/2021, 5:33 PMKevin Kho
10/27/2021, 5:34 PMRitvik Mishra
10/27/2021, 5:46 PMKevin Kho
10/27/2021, 5:50 PMRitvik Mishra
10/27/2021, 5:56 PMKevin Kho
10/27/2021, 6:05 PMRitvik Mishra
10/27/2021, 6:56 PMKevin Kho
10/27/2021, 7:01 PMRitvik Mishra
10/27/2021, 10:21 PMKevin Kho
10/27/2021, 10:31 PMRitvik Mishra
10/27/2021, 11:37 PMKevin Kho
10/28/2021, 12:59 AMRitvik Mishra
10/28/2021, 4:11 PMKevin Kho
10/28/2021, 4:16 PMflow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)
, but am still confused why this is happeningRitvik Mishra
10/28/2021, 4:17 PMKevin Kho
10/28/2021, 4:18 PMRitvik Mishra
10/28/2021, 10:02 PMKevin Kho
10/30/2021, 12:59 AMAnna Geller
10/30/2021, 12:27 PMRitvik Mishra
11/01/2021, 7:21 PMAnna Geller
11/03/2021, 11:40 AMDatabricksRunNow
and DatabricksSubmitRun
run until completion without any issues. I tested both tasks with a Databricks job that took an hour, and it worked well.
You can set this mode using a single environment variable in your run configuration:
run_config=UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Ritvik Mishra
11/03/2021, 7:46 PMKevin Kho
11/03/2021, 7:56 PMRitvik Mishra
11/03/2021, 8:58 PMAnna Geller
11/04/2021, 6:31 PMRitvik Mishra
11/04/2021, 7:15 PMKevin Kho
11/04/2021, 7:25 PMprefect version
in the CLI for the second question. For the run configuration, you must have something like:
flow.run_config = KubernetesRun(..)
I think Anna wants to see how you included the env variable that she suggested.Ritvik Mishra
11/04/2021, 7:29 PMKevin Kho
11/04/2021, 7:34 PMRitvik Mishra
11/04/2021, 7:35 PMKevin Kho
11/04/2021, 7:37 PMRitvik Mishra
11/04/2021, 8:14 PM