I have a case in which a LocalDaskExecutor task pu...
# ask-community
t
I have a case in which a LocalDaskExecutor task pulling data from a db gets the data and completes, but then never actually returns. It seems to stop without returning. I had a problem like this a release or three back in Prefect that resulted in a bug fix, but I haven't seen such an instance since. Is the issue back to our knowledge?
Related to the above, I turned on debug and added some debug statements around the end of the task. Here's the start of the logs as the flow began, and the second log is the point at which the task should have returned (but didn't).
This is the task in question:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    return df
And here's the flow:
Copy code
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
    cache_invalid = Parameter('cache_invalid',default=False)
    <http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
    logger.debug('Oracle pull returned.')
    df = set_data_types(df)
    create_data_summary_artifacts(df)
    u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)    
    add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)

flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
                                'PREFECT__LOGGING__LEVEL':'DEBUG',
                            },
        working_dir=file_path, labels=["normal-process"])

flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)

flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
k
It looks like Michael’s PR was added to version 1.0rc1. Not sure it’s related but are you on the 1.0rc1?
t
1.0rc?
I'm on prefect 0.15.13
Is that wrong? If I install via
pip install -U prefect
the above is what I get as the most current one
That PR does indeed look like the one that fixed this issue last time (or at least one with similar behavior) but I thought that was folded in here since 0.15.11 or so; I haven't seen the issue recently until I added this new flow today and it suddenly appeared again.
k
Yep the release candidate for 1.0. It’s available as a pre-release so you can get it with
pip install prefect==1.0rc1
. The
-U
just gets the latest release (not pre-release)
t
Doesn't the 0.15.13 have the above fix already though?
Should I prefer the rc one over the current one? Given the name I assumed rc would be less stable; not so?
k
You are right it should already be in 0.15.13 it seems
One sec looking at tracebacks
t
There's some log missing in the middle too; the above logs were the start and finish of the logs, but the missing piece wasn't relevant. (I don't know a way from the gui to download the full log; it seems to only download the chunk that's visible).
k
Are you trying to avoid processes for the LocalDaskExecutor scheduler? You opening to trying that? Does this issue happen consistently?
t
Not sure I understand the first question; you mean processes vs. threads? In this particular case I'm not averse to changing that; should I try it? And no, this is the first time I've encountered the issue again since the above-referenced fix.
And I have tried to see what's different about this flow vs. others that are very similar and I don't see any difference that might cause this
k
Yes exactly I think processes might be more stable. Worth a shot.
z
That PR has been released since 0.15.11
t
Restarting with processes; it'll take 20 minutes or so to see if it made a difference. And @Zanie, agreed, and I haven't seen this issue since then until now.
It's a bit concerning to me that it occurred here though; most of my other flows are currently set to use threads; is it possible they'll start having this issue again?
I haven't had issues with it recently, but makes me nervous when I can't explain why I did here and not in the others
z
It’s hard to say because I don’t understand what happened here yet either.
We’ll need the DEBUG level logs from the timeout function to diagnose where the deadlock occurs.
t
How do I get those again? I attached debug logs at the top; I remember last time you gave me some other command though to get more detailed debug logs; what was the process for that?
z
It looks like the DEBUG logs in the top just don’t include the relevant section
t
Hmm, well as you can see though, Debug was clearly on as debug results came back. What do I need to do to turn on any other sections if that didn't do it?
In this case I turned on debug in the flow (visible in the....third post here I think it was?)
Correction; fourth post.
z
It seems on but I do not see the full logs for the flow run?
t
What do I need to turn on to get them then? Or where can I pull that if there's somewhere else? (I'm just downloading from the GUI right now to get the above)
Sound like maybe there's a debug error here too. 😏
z
Seems like you’re only downloading a portion of what’s available as you mentioned above?
I don’t know how the download/pagination works in the UI off the top of my head.
t
Yes, in the gui you can't see the full log at once. You have to click "show older" or "show newer", and when you download (annoyingly) it downloads only the current page, not the whole log
z
I think there’s an open issue for that
👍 1
t
I downloaded basically the head, so you could see the executor and such, and then paged down to the section that included the task in question
I'll see if I can download more to make sure I get them all; hold on...
z
There should be more logs for the task in question that are prefixed with “Task ‘…’:”
We’re interested in the ones about the timeout handler and pickling results
t
To get this I picked "load older" until the top, and downloaded that. Then I clicked load newer, downloaded again (logs (1)) and then repeated (logs (2)) which should cover the entire period now
The end shows the cancellation which I triggered after I reached out here
I think we had some version of this issue last time too; you had me switch to trigger the flow manually with some manual command that turned on debug more to address it I think; maybe we need to do that again?
And if so, maybe there's another issue here that debug turned on at the flow level doesn't seem to propagate fully
The task in question here is imported from another module, so maybe that breaks the log level setting somehow?
z
The relevant logs are
Copy code
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10    prefect INFO    Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10    prefect DEBUG   Oracle pull completed; returning...
8 February 2022,03:31:10    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Execution successful.
t
Oh, was that in what I sent? Good. Sorry, I thought I'd included that in the second one at the top
t
Hmm, so something to do with the pickling again then?
z
well.. we’re not even getting the pickling log
Is it possible your run is running into memory constraints or something?
t
I doubt it, but let me check
z
Is your timeout being reached or are you cancelling it before then?
t
That's strange; basically the only stuff between those lines are the except blocks, but nothing gets thrown?
In one earlier version of this test the timeout did get hit and trigger a retry
It's a long timeout though; that's what flagged me originally to look into the task to discover this error. In the one I sent you above, I cancelled before the timeout was hit
z
Yeah the lack of message is very peculiar from looking at the code, something weird must be happening.
t
Also, incidentally, the version with processes just got to the point of getting the data back and it's still running so the threads to process switch didn't seem to help
This is my available memory over the last 24 hours on this machine, so definitely not hitting memory limits
I haven't dropped below 20GB free memory at any point in the last day
z
You can test this locally with
PREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Where name/project are replaced with your flow name/project
t
So....if it threw an error of any sort we should have hit one of those except blocks and gotten an error, right? Is it possible that it's failing ON line 317 somehow? And if so, maybe that's also causing issues in the related except block for it?
Maybe somehow getting the return_val value causes an error, causing both line 317 and the except to fail?
z
I guess it’s possible?
t
Running now with the abovel ocal test line
z
The
type(return_val)
call would have to fail which seems very unlikely
t
Is there another way that 299 would work but then nothing would happen? i.e. no debug line and also no error? I don't see another path for that to be possible.
z
It’s possible that somehow the logs aren’t flushing from the subprocess
t
(not that I'm the authority on this though 😏 )
z
So they are logging but we don’t get them
t
Right. Hopefully the run I just started, using your above command, will give us that though, right?
z
It’s really hard to say though, a local run may reveal more since we don’t have to rely on the logs getting sent to cloud.
t
It's running now
z
We’re using different timeout mechanisms in Orion so hopefully we don’t have this problem in the future 😄
❤️ 1
You seem to be an unlucky one, I thought I’d fixed this up
t
What's the timeframe for that?
hahaha, you did! I hadn't seen the issue again until today!
z
It’s coming along quickly! Since we’re releasing 1.0 of current Prefect, we’re able to focus more attention on 2.0/Orion.
t
Is the RC more stable than the current version btw? Should I upgrade from the one I'm on to rc1 or wait still?
I'm pretty excited for orion. I have to say, it looks like I'll be able to restructure all my flows to have a LOT more code reuse; hoping that comes sooner. Every one of these flows I add now is another I'll have to restructure then.
z
I’d say it’s less stable, it’s mostly the removal of old features.
👍 1
t
Good to know; I'll wait then
That flow is still running, btw, just waiting for it to return. Query has been taking 15-20 minutes today
z
👍 lmk
t
Oooh, we did get more! Hold on...
logs.txt
The pickling size line did work
And then nothing...
That would suggest it's the dumps line where we're stopping
z
What happens if you try to cloudpickle the result within your task?
t
Oooh, I'll bet this means I'm getting some new data type in this query that my other queries don't have. That's why I'm seeing the behavior now and not before
z
140 MB is kind of big
t
Is it? It's a LONG way from the biggest I'm working with
z
I guess not 🤷 I’m not sure what cloudpickle’s limitations are
It seems like it should be a fine size
t
But, last time we had this same issue, and the changes you added fixed it. So why would this one fail again?
z
I cannot say. There were a lot of possible issues with the old implementation but I thought I’d conquered them all.
t
Added a test version that pickles in the task and restarting
New task:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    pickled_val = cloudpickle.dumps(df)
    logger.debug(f"Pickling successful")
    return df
Thinking through what else might have caused this, I do think this particular query returns fields that are likely longer than any other I've had to date.
i.e. in the pandas dataframe, a particular column will have elements with longer values than any other I've had; some could be very long
The test task just returned; same thing happens. It gets to the dumps line and then just stops
Further testing; I removed the columns that could be very long and the error went away. 😞 That basically means there's some unknown issue, likely a length limit, to the data I can get back from a database, and encountering the issue doesn't even return an error apart from a timeout. Thoughts on anything I could do to be able to keep that kind of data in here?
Or at the least to hammer down what the limit is?
z
Just to clarify: It dumps correctly when you do it in your task rather than in the timeout handler?
The data is too large to place into a multiprocessing queue and it is causing a deadlock
t
No, it didn't work in the task either, encountered the same error. It worked when I removed the columns that might have very long data.
This data regularly works fine in csv or excel format though.
Given the above, are there options here? In the meantime I'm stuck basically excluding some data from my process.
k
If the issue is a multiprocessing deadlock, you may have to either go to a LocalExecutor, or break up the work unfortunately. Michael may have other ideas
z
Oh if it didn’t work in the task either, it’s not a multiprocessing deadlock.
I’d report this as a bug to cloudpickle if you can make a minimal reproducible example, I don’t think pickle should hang like this.
You could attempt to serialize it another way. Perhaps by returning
<http://df.to|df.to>_json()
from your task instead of the raw data.