https://prefect.io logo
Title
r

Ritvik Mishra

10/20/2021, 10:00 PM
Hi! We use prefect to run databricks tasks, and we've been noticing lately that the status prefect shows for a task doesn't always update in time - prefect frequently thinks something is still running, or hasn't even started, when it already started and finished. So the flow doesn't continue on to the next tasks in the dag and gets stuck. Eventually gets the updated status of the databricks tasks and continues. This seems like some sort of communication issue between prefect and databricks, but I can't figure out how to debug. Would appreciate some help here! Thanks!
k

Kevin Kho

10/20/2021, 10:02 PM
Are you mapping with the Databricks task by chance?
r

Ritvik Mishra

10/20/2021, 10:08 PM
Sorry, not sure I understand what you mean
k

Kevin Kho

10/20/2021, 10:09 PM
Like DatabricksSubmit.map(…) in Prefect
Are you using the Databricks tasks from the task library?
There is a polling interval there of 30 seconds by default. Do you think that can be the issue? Maybe you want to decrease it?
r

Ritvik Mishra

10/20/2021, 10:11 PM
Got it - we're using DatabricksRunNow from the prefect tasks library
It's off by hours so not sure if the 30 second polling interval could be the cause, and it's also only happening for some tasks not all
k

Kevin Kho

10/20/2021, 10:13 PM
Ah ok. What executor are you using?
r

Ritvik Mishra

10/20/2021, 10:18 PM
LocalDaskExecutor
k

Kevin Kho

10/20/2021, 10:19 PM
Ah so do you do
DatabricksRunNow().map()
to parallelize?
Or could you show me the code? I have ideas. Just hide the sensitive info
r

Ritvik Mishra

10/20/2021, 10:21 PM
we just do DatabricksRunNow() to run
I'll send a code snippet, one minute
k

Kevin Kho

10/20/2021, 10:23 PM
I suspect you are doing something like this:
databricks_task = DatabricksRunNow()
with Flow() as flow..
     databricks_task(...)
     databricks_task(...)
Is that right?
r

Ritvik Mishra

10/20/2021, 10:25 PM
with Flow('..') as flow:
flow.storage = GitHub(
repo="...",
path="...",
access_token_secret="..."
)
RunNowTask = DatabricksRunNow(databricks_conn_secret=json.loads(os.environ['<some auth related stuff>']))
RunNowTask.copy(
name=name,
job_id=job_id,
max_retries=max_retries,
retry_delay=retry_delay,
notebook_params=notebook_params,
)
RunNowTask()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
yep same as what you sent
k

Kevin Kho

10/20/2021, 10:28 PM
So there was this issue that the Databricks task was not thread safe
And Anna fixed it here . But we haven’t had a release. I think we’ll have one today so it might be updated
The previous implementation was holding state in
self
and calling
run()
edited
self
. This means that invoking it multiple times and calling the
run
inside the
Flow
was causing the
self
to be altered and lost info of the previous calls.
r

Ritvik Mishra

10/20/2021, 10:31 PM
Cool - thanks for explaining! Will let you know if I'm still having this issue after the release. Appreciate the really fast responses here.
k

Kevin Kho

10/20/2021, 10:31 PM
Of course!
❤️ 1
r

Ritvik Mishra

10/27/2021, 4:42 PM
Hi! Not sure if this is related, but now we are seeing tasks that fail with the message "Trigger was "all_successful" but some of the upstream tasks failed.". And when I look at the task runs, all upstream tasks have succeeded and prefect is aware of that.
Here is a screenshot of what prefect shows in case my description is hard to follow
k

Kevin Kho

10/27/2021, 5:15 PM
Are you using Dask on Kubernetes?
r

Ritvik Mishra

10/27/2021, 5:33 PM
Kubernetes
k

Kevin Kho

10/27/2021, 5:34 PM
Is there a chance that your workers are pointed to Cloud? I’ve seen this before workers were reporting to Cloud instead of Server, causing the failure in the tasks
r

Ritvik Mishra

10/27/2021, 5:46 PM
I'm not sure what exactly that means - how can I check?
k

Kevin Kho

10/27/2021, 5:50 PM
Looking for a thread. Just to be clear, you are on a Dask Executor right? And these are mapped tasks?
I can’t find the thread but maybe you can check the worker logs on the Dask dashboard?
r

Ritvik Mishra

10/27/2021, 5:56 PM
Yes dask executor, but not mapped. Will check worker logs. Thanks!
k

Kevin Kho

10/27/2021, 6:05 PM
I can’t find the thread I’m thinking of anymore. Let me just ask some questions before we go there. Does this happen all the time for this flow? Or does it succeed? Are you using your own image for Dask workers?
r

Ritvik Mishra

10/27/2021, 6:56 PM
It sometimes works, sometimes fails - that's the confusing part for us
"Are you using your own image for Dask workers?" - sorry don't know how to answer that question I'm new to our setup, let me know how I can check
k

Kevin Kho

10/27/2021, 7:01 PM
You can give me the code to how you spin up your DaskExecutor
r

Ritvik Mishra

10/27/2021, 10:21 PM
We run the kubernetes prefect agent, doesn't that spin up the dask executor?
k

Kevin Kho

10/27/2021, 10:31 PM
Not necessarily. It will run the Flow as a Kubernetes job but it’s the executor definition that will spin up the dask executor. The KubernetesRun can be LocalExecutor which will just take place on the job
r

Ritvik Mishra

10/27/2021, 11:37 PM
I see. We don't have our own logic for spinning up the dask executor as far as I know
k

Kevin Kho

10/28/2021, 12:59 AM
I understand what you are saying, but I am all the more convinced now. Could I see the flow snippet?
r

Ritvik Mishra

10/28/2021, 4:11 PM
@Kevin Kho sorry for the delay - snippet is a little bit earlier in this thread history https://prefect-community.slack.com/archives/C014Z8DPDSR/p1634768730467400?thread_ts=1634767253.464300&amp;cid=C014Z8DPDSR
k

Kevin Kho

10/28/2021, 4:16 PM
Oh my bad. You might get a bit more stability with
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)
, but am still confused why this is happening
r

Ritvik Mishra

10/28/2021, 4:17 PM
No worries! Can you please explain the difference between threads and processes for the scheduler?
k

Kevin Kho

10/28/2021, 4:18 PM
That is the scheduler for the local dask cluster. This is a general concept rather than a Prefect/Dask concept. The processes are more isolated while the threads can share memory.
r

Ritvik Mishra

10/28/2021, 10:02 PM
Thanks, I'll try and let you know how it goes
Today started seeing something, not sure if it's caused by this change we made - but tasks are failing in prefect with "No heartbeat detected from the remote task; marking the run as failed." then it does a retry, but in databricks everything is succeeding
k

Kevin Kho

10/30/2021, 12:59 AM
Ah I see. About heartbeats, read this
a

Anna Geller

10/30/2021, 12:27 PM
@Ritvik Mishra to reproduce the issue: 1. How long is your Databricks job running? 2. Which Prefect version do you use? 3. Which spark version do you use?
r

Ritvik Mishra

11/01/2021, 7:21 PM
1. it wasn't running for very long when that happened, maybe an hour or so 2. not sure will check and get back to you 3. version 3.1.1
a

Anna Geller

11/03/2021, 11:40 AM
@Ritvik Mishra I investigated the issue in detail and I found a solution. In a default configuration, a flow triggering a Databricks job that runs for longer than 30 min keeps dying since Flow’s heartbeat is no longer detected with such long-running external jobs. For a Spark job that takes 30 min or less, the default configuration works fine. If you set the Flow’s heartbeat mode to “thread”, both
DatabricksRunNow
and
DatabricksSubmitRun
run until completion without any issues. I tested both tasks with a Databricks job that took an hour, and it worked well. You can set this mode using a single environment variable in your run configuration:
run_config=UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
r

Ritvik Mishra

11/03/2021, 7:46 PM
I'm currently using KubernetesRun - will that env variable work still?
And should I change the schedule back to thread if I'm doing this or is that unrelated?
k

Kevin Kho

11/03/2021, 7:56 PM
Yes the env var will work and then this may help you with the long running Databricks Job
:upvote: 1
r

Ritvik Mishra

11/03/2021, 8:58 PM
Thanks will try it out!
I made that change as then we consistently had heartbeat issues even for things that only took 5 minutes
Reverted that, and we are also seeing a lot of what I had before: 'Trigger was "all_successful" but some of the upstream tasks failed.' when no upstream tasks failed
a

Anna Geller

11/04/2021, 6:31 PM
can you share your run configuration and your Prefect version?
Can you provide more details about your setup? I want to understand why it didn’t work for you, since I tested this setup before with the duration and Spark job version you mentioned
r

Ritvik Mishra

11/04/2021, 7:15 PM
What do you mean by run configuration? Sent a lot of code snippets earlier in this thread, let me know if there is any info missing from there
Do you know how I can check what version of prefect it is?
k

Kevin Kho

11/04/2021, 7:25 PM
prefect version
in the CLI for the second question. For the run configuration, you must have something like:
flow.run_config = KubernetesRun(..)
I think Anna wants to see how you included the env variable that she suggested.
r

Ritvik Mishra

11/04/2021, 7:29 PM
flow.run_config = KubernetesRun( env={ "DATABRICKS_TOKEN": token, "PREFECT__CLOUD__HEARTBEAT_MODE": "thread", } )
prefect version: 0.15.6
k

Kevin Kho

11/04/2021, 7:34 PM
Is that the version on the agent? or flow registration?
r

Ritvik Mishra

11/04/2021, 7:35 PM
On the agent
k

Kevin Kho

11/04/2021, 7:37 PM
For the Databricks task specifically, there were changes in 0.15.7 to help it become more stable. Changelog here . It might be worth upgrading to 0.15.7
r

Ritvik Mishra

11/04/2021, 8:14 PM
Will try updating to 0.15.7
:upvote: 1