Ritvik Mishra
10/20/2021, 10:00 PMKevin Kho
Ritvik Mishra
10/20/2021, 10:08 PMKevin Kho
Kevin Kho
Kevin Kho
Ritvik Mishra
10/20/2021, 10:11 PMRitvik Mishra
10/20/2021, 10:11 PMKevin Kho
Ritvik Mishra
10/20/2021, 10:18 PMKevin Kho
DatabricksRunNow().map() to parallelize?Kevin Kho
Ritvik Mishra
10/20/2021, 10:21 PMRitvik Mishra
10/20/2021, 10:22 PMKevin Kho
databricks_task = DatabricksRunNow()
with Flow() as flow..
databricks_task(...)
databricks_task(...)
Is that right?Ritvik Mishra
10/20/2021, 10:25 PMwith Flow('..') as flow:
flow.storage = GitHub(
repo="...",
path="...",
access_token_secret="..."
)
RunNowTask = DatabricksRunNow(databricks_conn_secret=json.loads(os.environ['<some auth related stuff>']))
RunNowTask.copy(
name=name,
job_id=job_id,
max_retries=max_retries,
retry_delay=retry_delay,
notebook_params=notebook_params,
)
RunNowTask()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)Ritvik Mishra
10/20/2021, 10:25 PMKevin Kho
Kevin Kho
Kevin Kho
self and calling run() edited self. This means that invoking it multiple times and calling the run inside the Flow was causing the self to be altered and lost info of the previous calls.Ritvik Mishra
10/20/2021, 10:31 PMKevin Kho
Ritvik Mishra
10/27/2021, 4:42 PMRitvik Mishra
10/27/2021, 4:45 PMKevin Kho
Ritvik Mishra
10/27/2021, 5:33 PMKevin Kho
Ritvik Mishra
10/27/2021, 5:46 PMKevin Kho
Kevin Kho
Ritvik Mishra
10/27/2021, 5:56 PMKevin Kho
Ritvik Mishra
10/27/2021, 6:56 PMRitvik Mishra
10/27/2021, 6:56 PMKevin Kho
Ritvik Mishra
10/27/2021, 10:21 PMKevin Kho
Ritvik Mishra
10/27/2021, 11:37 PMKevin Kho
Ritvik Mishra
10/28/2021, 4:11 PMKevin Kho
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4) , but am still confused why this is happeningRitvik Mishra
10/28/2021, 4:17 PMKevin Kho
Ritvik Mishra
10/28/2021, 10:02 PMRitvik Mishra
10/29/2021, 11:28 PMKevin Kho
Anna Geller
Ritvik Mishra
11/01/2021, 7:21 PMAnna Geller
DatabricksRunNow and DatabricksSubmitRun run until completion without any issues. I tested both tasks with a Databricks job that took an hour, and it worked well.
You can set this mode using a single environment variable in your run configuration:
run_config=UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})Ritvik Mishra
11/03/2021, 7:46 PMRitvik Mishra
11/03/2021, 7:47 PMKevin Kho
Ritvik Mishra
11/03/2021, 8:58 PMRitvik Mishra
11/04/2021, 6:26 PMRitvik Mishra
11/04/2021, 6:26 PMAnna Geller
Anna Geller
Ritvik Mishra
11/04/2021, 7:15 PMRitvik Mishra
11/04/2021, 7:15 PMKevin Kho
prefect version in the CLI for the second question. For the run configuration, you must have something like:
flow.run_config = KubernetesRun(..)
I think Anna wants to see how you included the env variable that she suggested.Ritvik Mishra
11/04/2021, 7:29 PMRitvik Mishra
11/04/2021, 7:33 PMKevin Kho
Ritvik Mishra
11/04/2021, 7:35 PMKevin Kho
Ritvik Mishra
11/04/2021, 8:14 PM