Hi! We use prefect to run databricks tasks, and we...
# prefect-server
r
Hi! We use prefect to run databricks tasks, and we've been noticing lately that the status prefect shows for a task doesn't always update in time - prefect frequently thinks something is still running, or hasn't even started, when it already started and finished. So the flow doesn't continue on to the next tasks in the dag and gets stuck. Eventually gets the updated status of the databricks tasks and continues. This seems like some sort of communication issue between prefect and databricks, but I can't figure out how to debug. Would appreciate some help here! Thanks!
k
Are you mapping with the Databricks task by chance?
r
Sorry, not sure I understand what you mean
k
Like DatabricksSubmit.map(…) in Prefect
Are you using the Databricks tasks from the task library?
There is a polling interval there of 30 seconds by default. Do you think that can be the issue? Maybe you want to decrease it?
r
Got it - we're using DatabricksRunNow from the prefect tasks library
It's off by hours so not sure if the 30 second polling interval could be the cause, and it's also only happening for some tasks not all
k
Ah ok. What executor are you using?
r
LocalDaskExecutor
k
Ah so do you do
DatabricksRunNow().map()
to parallelize?
Or could you show me the code? I have ideas. Just hide the sensitive info
r
we just do DatabricksRunNow() to run
I'll send a code snippet, one minute
k
I suspect you are doing something like this:
Copy code
databricks_task = DatabricksRunNow()
with Flow() as flow..
     databricks_task(...)
     databricks_task(...)
Is that right?
r
with Flow('..') as flow:
flow.storage = GitHub(
repo="...",
path="...",
access_token_secret="..."
)
RunNowTask = DatabricksRunNow(databricks_conn_secret=json.loads(os.environ['<some auth related stuff>']))
RunNowTask.copy(
name=name,
job_id=job_id,
max_retries=max_retries,
retry_delay=retry_delay,
notebook_params=notebook_params,
)
RunNowTask()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
yep same as what you sent
k
So there was this issue that the Databricks task was not thread safe
And Anna fixed it here . But we haven’t had a release. I think we’ll have one today so it might be updated
The previous implementation was holding state in
self
and calling
run()
edited
self
. This means that invoking it multiple times and calling the
run
inside the
Flow
was causing the
self
to be altered and lost info of the previous calls.
r
Cool - thanks for explaining! Will let you know if I'm still having this issue after the release. Appreciate the really fast responses here.
k
Of course!
❤️ 1
r
Hi! Not sure if this is related, but now we are seeing tasks that fail with the message "Trigger was "all_successful" but some of the upstream tasks failed.". And when I look at the task runs, all upstream tasks have succeeded and prefect is aware of that.
Here is a screenshot of what prefect shows in case my description is hard to follow
k
Are you using Dask on Kubernetes?
r
Kubernetes
k
Is there a chance that your workers are pointed to Cloud? I’ve seen this before workers were reporting to Cloud instead of Server, causing the failure in the tasks
r
I'm not sure what exactly that means - how can I check?
k
Looking for a thread. Just to be clear, you are on a Dask Executor right? And these are mapped tasks?
I can’t find the thread but maybe you can check the worker logs on the Dask dashboard?
r
Yes dask executor, but not mapped. Will check worker logs. Thanks!
k
I can’t find the thread I’m thinking of anymore. Let me just ask some questions before we go there. Does this happen all the time for this flow? Or does it succeed? Are you using your own image for Dask workers?
r
It sometimes works, sometimes fails - that's the confusing part for us
"Are you using your own image for Dask workers?" - sorry don't know how to answer that question I'm new to our setup, let me know how I can check
k
You can give me the code to how you spin up your DaskExecutor
r
We run the kubernetes prefect agent, doesn't that spin up the dask executor?
k
Not necessarily. It will run the Flow as a Kubernetes job but it’s the executor definition that will spin up the dask executor. The KubernetesRun can be LocalExecutor which will just take place on the job
r
I see. We don't have our own logic for spinning up the dask executor as far as I know
k
I understand what you are saying, but I am all the more convinced now. Could I see the flow snippet?
r
@Kevin Kho sorry for the delay - snippet is a little bit earlier in this thread history https://prefect-community.slack.com/archives/C014Z8DPDSR/p1634768730467400?thread_ts=1634767253.464300&amp;cid=C014Z8DPDSR
k
Oh my bad. You might get a bit more stability with
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)
, but am still confused why this is happening
r
No worries! Can you please explain the difference between threads and processes for the scheduler?
k
That is the scheduler for the local dask cluster. This is a general concept rather than a Prefect/Dask concept. The processes are more isolated while the threads can share memory.
r
Thanks, I'll try and let you know how it goes
Today started seeing something, not sure if it's caused by this change we made - but tasks are failing in prefect with "No heartbeat detected from the remote task; marking the run as failed." then it does a retry, but in databricks everything is succeeding
k
Ah I see. About heartbeats, read this
a
@Ritvik Mishra to reproduce the issue: 1. How long is your Databricks job running? 2. Which Prefect version do you use? 3. Which spark version do you use?
r
1. it wasn't running for very long when that happened, maybe an hour or so 2. not sure will check and get back to you 3. version 3.1.1
a
@Ritvik Mishra I investigated the issue in detail and I found a solution. In a default configuration, a flow triggering a Databricks job that runs for longer than 30 min keeps dying since Flow’s heartbeat is no longer detected with such long-running external jobs. For a Spark job that takes 30 min or less, the default configuration works fine. If you set the Flow’s heartbeat mode to “thread”, both
DatabricksRunNow
and
DatabricksSubmitRun
run until completion without any issues. I tested both tasks with a Databricks job that took an hour, and it worked well. You can set this mode using a single environment variable in your run configuration:
Copy code
run_config=UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
r
I'm currently using KubernetesRun - will that env variable work still?
And should I change the schedule back to thread if I'm doing this or is that unrelated?
k
Yes the env var will work and then this may help you with the long running Databricks Job
upvote 1
r
Thanks will try it out!
I made that change as then we consistently had heartbeat issues even for things that only took 5 minutes
Reverted that, and we are also seeing a lot of what I had before: 'Trigger was "all_successful" but some of the upstream tasks failed.' when no upstream tasks failed
a
can you share your run configuration and your Prefect version?
Can you provide more details about your setup? I want to understand why it didn’t work for you, since I tested this setup before with the duration and Spark job version you mentioned
r
What do you mean by run configuration? Sent a lot of code snippets earlier in this thread, let me know if there is any info missing from there
Do you know how I can check what version of prefect it is?
k
prefect version
in the CLI for the second question. For the run configuration, you must have something like:
Copy code
flow.run_config = KubernetesRun(..)
I think Anna wants to see how you included the env variable that she suggested.
r
flow.run_config = KubernetesRun( env={ "DATABRICKS_TOKEN": token, "PREFECT__CLOUD__HEARTBEAT_MODE": "thread", } )
prefect version: 0.15.6
k
Is that the version on the agent? or flow registration?
r
On the agent
k
For the Databricks task specifically, there were changes in 0.15.7 to help it become more stable. Changelog here . It might be worth upgrading to 0.15.7
r
Will try updating to 0.15.7
upvote 1
130 Views