Ritvik Mishra
10/20/2021, 10:00 PMKevin Kho
Ritvik Mishra
10/20/2021, 10:08 PMKevin Kho
Kevin Kho
Kevin Kho
Ritvik Mishra
10/20/2021, 10:11 PMRitvik Mishra
10/20/2021, 10:11 PMKevin Kho
Ritvik Mishra
10/20/2021, 10:18 PMKevin Kho
DatabricksRunNow().map()
to parallelize?Kevin Kho
Ritvik Mishra
10/20/2021, 10:21 PMRitvik Mishra
10/20/2021, 10:22 PMKevin Kho
databricks_task = DatabricksRunNow()
with Flow() as flow..
databricks_task(...)
databricks_task(...)
Is that right?Ritvik Mishra
10/20/2021, 10:25 PMwith Flow('..') as flow:
flow.storage = GitHub(
repo="...",
path="...",
access_token_secret="..."
)
RunNowTask = DatabricksRunNow(databricks_conn_secret=json.loads(os.environ['<some auth related stuff>']))
RunNowTask.copy(
name=name,
job_id=job_id,
max_retries=max_retries,
retry_delay=retry_delay,
notebook_params=notebook_params,
)
RunNowTask()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
Ritvik Mishra
10/20/2021, 10:25 PMKevin Kho
Kevin Kho
Kevin Kho
self
and calling run()
edited self
. This means that invoking it multiple times and calling the run
inside the Flow
was causing the self
to be altered and lost info of the previous calls.Ritvik Mishra
10/20/2021, 10:31 PMKevin Kho
Ritvik Mishra
10/27/2021, 4:42 PMRitvik Mishra
10/27/2021, 4:45 PMKevin Kho
Ritvik Mishra
10/27/2021, 5:33 PMKevin Kho
Ritvik Mishra
10/27/2021, 5:46 PMKevin Kho
Kevin Kho
Ritvik Mishra
10/27/2021, 5:56 PMKevin Kho
Ritvik Mishra
10/27/2021, 6:56 PMRitvik Mishra
10/27/2021, 6:56 PMKevin Kho
Ritvik Mishra
10/27/2021, 10:21 PMKevin Kho
Ritvik Mishra
10/27/2021, 11:37 PMKevin Kho
Ritvik Mishra
10/28/2021, 4:11 PMKevin Kho
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)
, but am still confused why this is happeningRitvik Mishra
10/28/2021, 4:17 PMKevin Kho
Ritvik Mishra
10/28/2021, 10:02 PMRitvik Mishra
10/29/2021, 11:28 PMKevin Kho
Anna Geller
Ritvik Mishra
11/01/2021, 7:21 PMAnna Geller
DatabricksRunNow
and DatabricksSubmitRun
run until completion without any issues. I tested both tasks with a Databricks job that took an hour, and it worked well.
You can set this mode using a single environment variable in your run configuration:
run_config=UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Ritvik Mishra
11/03/2021, 7:46 PMRitvik Mishra
11/03/2021, 7:47 PMKevin Kho
Ritvik Mishra
11/03/2021, 8:58 PMRitvik Mishra
11/04/2021, 6:26 PMRitvik Mishra
11/04/2021, 6:26 PMAnna Geller
Anna Geller
Ritvik Mishra
11/04/2021, 7:15 PMRitvik Mishra
11/04/2021, 7:15 PMKevin Kho
prefect version
in the CLI for the second question. For the run configuration, you must have something like:
flow.run_config = KubernetesRun(..)
I think Anna wants to see how you included the env variable that she suggested.Ritvik Mishra
11/04/2021, 7:29 PMRitvik Mishra
11/04/2021, 7:33 PMKevin Kho
Ritvik Mishra
11/04/2021, 7:35 PMKevin Kho
Ritvik Mishra
11/04/2021, 8:14 PM