https://prefect.io logo
t

Tom Shaffner

02/08/2022, 8:42 PM
I have a case in which a LocalDaskExecutor task pulling data from a db gets the data and completes, but then never actually returns. It seems to stop without returning. I had a problem like this a release or three back in Prefect that resulted in a bug fix, but I haven't seen such an instance since. Is the issue back to our knowledge?
Related to the above, I turned on debug and added some debug statements around the end of the task. Here's the start of the logs as the flow began, and the second log is the point at which the task should have returned (but didn't).
This is the task in question:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    return df
And here's the flow:
Copy code
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
    cache_invalid = Parameter('cache_invalid',default=False)
    <http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
    logger.debug('Oracle pull returned.')
    df = set_data_types(df)
    create_data_summary_artifacts(df)
    u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)    
    add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)

flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
                                'PREFECT__LOGGING__LEVEL':'DEBUG',
                            },
        working_dir=file_path, labels=["normal-process"])

flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)

flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
k

Kevin Kho

02/08/2022, 8:45 PM
It looks like Michael’s PR was added to version 1.0rc1. Not sure it’s related but are you on the 1.0rc1?
t

Tom Shaffner

02/08/2022, 8:45 PM
1.0rc?
I'm on prefect 0.15.13
Is that wrong? If I install via
pip install -U prefect
the above is what I get as the most current one
That PR does indeed look like the one that fixed this issue last time (or at least one with similar behavior) but I thought that was folded in here since 0.15.11 or so; I haven't seen the issue recently until I added this new flow today and it suddenly appeared again.
k

Kevin Kho

02/08/2022, 8:48 PM
Yep the release candidate for 1.0. It’s available as a pre-release so you can get it with
pip install prefect==1.0rc1
. The
-U
just gets the latest release (not pre-release)
t

Tom Shaffner

02/08/2022, 8:48 PM
Doesn't the 0.15.13 have the above fix already though?
Should I prefer the rc one over the current one? Given the name I assumed rc would be less stable; not so?
k

Kevin Kho

02/08/2022, 8:51 PM
You are right it should already be in 0.15.13 it seems
One sec looking at tracebacks
t

Tom Shaffner

02/08/2022, 8:52 PM
There's some log missing in the middle too; the above logs were the start and finish of the logs, but the missing piece wasn't relevant. (I don't know a way from the gui to download the full log; it seems to only download the chunk that's visible).
k

Kevin Kho

02/08/2022, 8:58 PM
Are you trying to avoid processes for the LocalDaskExecutor scheduler? You opening to trying that? Does this issue happen consistently?
t

Tom Shaffner

02/08/2022, 8:59 PM
Not sure I understand the first question; you mean processes vs. threads? In this particular case I'm not averse to changing that; should I try it? And no, this is the first time I've encountered the issue again since the above-referenced fix.
And I have tried to see what's different about this flow vs. others that are very similar and I don't see any difference that might cause this
k

Kevin Kho

02/08/2022, 8:59 PM
Yes exactly I think processes might be more stable. Worth a shot.
z

Zanie

02/08/2022, 9:00 PM
That PR has been released since 0.15.11
t

Tom Shaffner

02/08/2022, 9:01 PM
Restarting with processes; it'll take 20 minutes or so to see if it made a difference. And @Zanie, agreed, and I haven't seen this issue since then until now.
It's a bit concerning to me that it occurred here though; most of my other flows are currently set to use threads; is it possible they'll start having this issue again?
I haven't had issues with it recently, but makes me nervous when I can't explain why I did here and not in the others
z

Zanie

02/08/2022, 9:02 PM
It’s hard to say because I don’t understand what happened here yet either.
We’ll need the DEBUG level logs from the timeout function to diagnose where the deadlock occurs.
t

Tom Shaffner

02/08/2022, 9:04 PM
How do I get those again? I attached debug logs at the top; I remember last time you gave me some other command though to get more detailed debug logs; what was the process for that?
z

Zanie

02/08/2022, 9:05 PM
It looks like the DEBUG logs in the top just don’t include the relevant section
t

Tom Shaffner

02/08/2022, 9:06 PM
Hmm, well as you can see though, Debug was clearly on as debug results came back. What do I need to do to turn on any other sections if that didn't do it?
In this case I turned on debug in the flow (visible in the....third post here I think it was?)
Correction; fourth post.
z

Zanie

02/08/2022, 9:09 PM
It seems on but I do not see the full logs for the flow run?
t

Tom Shaffner

02/08/2022, 9:09 PM
What do I need to turn on to get them then? Or where can I pull that if there's somewhere else? (I'm just downloading from the GUI right now to get the above)
Sound like maybe there's a debug error here too. 😏
z

Zanie

02/08/2022, 9:11 PM
Seems like you’re only downloading a portion of what’s available as you mentioned above?
I don’t know how the download/pagination works in the UI off the top of my head.
t

Tom Shaffner

02/08/2022, 9:12 PM
Yes, in the gui you can't see the full log at once. You have to click "show older" or "show newer", and when you download (annoyingly) it downloads only the current page, not the whole log
z

Zanie

02/08/2022, 9:12 PM
I think there’s an open issue for that
👍 1
t

Tom Shaffner

02/08/2022, 9:12 PM
I downloaded basically the head, so you could see the executor and such, and then paged down to the section that included the task in question
I'll see if I can download more to make sure I get them all; hold on...
z

Zanie

02/08/2022, 9:12 PM
There should be more logs for the task in question that are prefixed with “Task ‘…’:”
We’re interested in the ones about the timeout handler and pickling results
t

Tom Shaffner

02/08/2022, 9:14 PM
To get this I picked "load older" until the top, and downloaded that. Then I clicked load newer, downloaded again (logs (1)) and then repeated (logs (2)) which should cover the entire period now
The end shows the cancellation which I triggered after I reached out here
I think we had some version of this issue last time too; you had me switch to trigger the flow manually with some manual command that turned on debug more to address it I think; maybe we need to do that again?
And if so, maybe there's another issue here that debug turned on at the flow level doesn't seem to propagate fully
The task in question here is imported from another module, so maybe that breaks the log level setting somehow?
z

Zanie

02/08/2022, 9:17 PM
The relevant logs are
Copy code
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10    prefect INFO    Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10    prefect DEBUG   Oracle pull completed; returning...
8 February 2022,03:31:10    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Execution successful.
t

Tom Shaffner

02/08/2022, 9:18 PM
Oh, was that in what I sent? Good. Sorry, I thought I'd included that in the second one at the top
t

Tom Shaffner

02/08/2022, 9:18 PM
Hmm, so something to do with the pickling again then?
z

Zanie

02/08/2022, 9:18 PM
well.. we’re not even getting the pickling log
Is it possible your run is running into memory constraints or something?
t

Tom Shaffner

02/08/2022, 9:19 PM
I doubt it, but let me check
z

Zanie

02/08/2022, 9:19 PM
Is your timeout being reached or are you cancelling it before then?
t

Tom Shaffner

02/08/2022, 9:20 PM
That's strange; basically the only stuff between those lines are the except blocks, but nothing gets thrown?
In one earlier version of this test the timeout did get hit and trigger a retry
It's a long timeout though; that's what flagged me originally to look into the task to discover this error. In the one I sent you above, I cancelled before the timeout was hit
z

Zanie

02/08/2022, 9:20 PM
Yeah the lack of message is very peculiar from looking at the code, something weird must be happening.
t

Tom Shaffner

02/08/2022, 9:21 PM
Also, incidentally, the version with processes just got to the point of getting the data back and it's still running so the threads to process switch didn't seem to help
This is my available memory over the last 24 hours on this machine, so definitely not hitting memory limits
I haven't dropped below 20GB free memory at any point in the last day
z

Zanie

02/08/2022, 9:24 PM
You can test this locally with
PREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Where name/project are replaced with your flow name/project
t

Tom Shaffner

02/08/2022, 9:25 PM
So....if it threw an error of any sort we should have hit one of those except blocks and gotten an error, right? Is it possible that it's failing ON line 317 somehow? And if so, maybe that's also causing issues in the related except block for it?
Maybe somehow getting the return_val value causes an error, causing both line 317 and the except to fail?
z

Zanie

02/08/2022, 9:27 PM
I guess it’s possible?
t

Tom Shaffner

02/08/2022, 9:27 PM
Running now with the abovel ocal test line
z

Zanie

02/08/2022, 9:28 PM
The
type(return_val)
call would have to fail which seems very unlikely
t

Tom Shaffner

02/08/2022, 9:28 PM
Is there another way that 299 would work but then nothing would happen? i.e. no debug line and also no error? I don't see another path for that to be possible.
z

Zanie

02/08/2022, 9:28 PM
It’s possible that somehow the logs aren’t flushing from the subprocess
t

Tom Shaffner

02/08/2022, 9:28 PM
(not that I'm the authority on this though 😏 )
z

Zanie

02/08/2022, 9:29 PM
So they are logging but we don’t get them
t

Tom Shaffner

02/08/2022, 9:29 PM
Right. Hopefully the run I just started, using your above command, will give us that though, right?
z

Zanie

02/08/2022, 9:29 PM
It’s really hard to say though, a local run may reveal more since we don’t have to rely on the logs getting sent to cloud.
t

Tom Shaffner

02/08/2022, 9:29 PM
It's running now
z

Zanie

02/08/2022, 9:30 PM
We’re using different timeout mechanisms in Orion so hopefully we don’t have this problem in the future 😄
❤️ 1
You seem to be an unlucky one, I thought I’d fixed this up
t

Tom Shaffner

02/08/2022, 9:30 PM
What's the timeframe for that?
hahaha, you did! I hadn't seen the issue again until today!
z

Zanie

02/08/2022, 9:31 PM
It’s coming along quickly! Since we’re releasing 1.0 of current Prefect, we’re able to focus more attention on 2.0/Orion.
t

Tom Shaffner

02/08/2022, 9:32 PM
Is the RC more stable than the current version btw? Should I upgrade from the one I'm on to rc1 or wait still?
I'm pretty excited for orion. I have to say, it looks like I'll be able to restructure all my flows to have a LOT more code reuse; hoping that comes sooner. Every one of these flows I add now is another I'll have to restructure then.
z

Zanie

02/08/2022, 9:33 PM
I’d say it’s less stable, it’s mostly the removal of old features.
👍 1
t

Tom Shaffner

02/08/2022, 9:33 PM
Good to know; I'll wait then
That flow is still running, btw, just waiting for it to return. Query has been taking 15-20 minutes today
z

Zanie

02/08/2022, 9:36 PM
👍 lmk
t

Tom Shaffner

02/08/2022, 9:42 PM
Oooh, we did get more! Hold on...
logs.txt
The pickling size line did work
And then nothing...
That would suggest it's the dumps line where we're stopping
z

Zanie

02/08/2022, 9:44 PM
What happens if you try to cloudpickle the result within your task?
t

Tom Shaffner

02/08/2022, 9:44 PM
Oooh, I'll bet this means I'm getting some new data type in this query that my other queries don't have. That's why I'm seeing the behavior now and not before
z

Zanie

02/08/2022, 9:45 PM
140 MB is kind of big
t

Tom Shaffner

02/08/2022, 9:45 PM
Is it? It's a LONG way from the biggest I'm working with
z

Zanie

02/08/2022, 9:45 PM
I guess not 🤷 I’m not sure what cloudpickle’s limitations are
It seems like it should be a fine size
t

Tom Shaffner

02/08/2022, 9:46 PM
But, last time we had this same issue, and the changes you added fixed it. So why would this one fail again?
z

Zanie

02/08/2022, 9:48 PM
I cannot say. There were a lot of possible issues with the old implementation but I thought I’d conquered them all.
t

Tom Shaffner

02/08/2022, 9:51 PM
Added a test version that pickles in the task and restarting
New task:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    pickled_val = cloudpickle.dumps(df)
    logger.debug(f"Pickling successful")
    return df
Thinking through what else might have caused this, I do think this particular query returns fields that are likely longer than any other I've had to date.
i.e. in the pandas dataframe, a particular column will have elements with longer values than any other I've had; some could be very long
The test task just returned; same thing happens. It gets to the dumps line and then just stops
Further testing; I removed the columns that could be very long and the error went away. 😞 That basically means there's some unknown issue, likely a length limit, to the data I can get back from a database, and encountering the issue doesn't even return an error apart from a timeout. Thoughts on anything I could do to be able to keep that kind of data in here?
Or at the least to hammer down what the limit is?
z

Zanie

02/08/2022, 10:37 PM
Just to clarify: It dumps correctly when you do it in your task rather than in the timeout handler?
The data is too large to place into a multiprocessing queue and it is causing a deadlock
t

Tom Shaffner

02/09/2022, 2:43 PM
No, it didn't work in the task either, encountered the same error. It worked when I removed the columns that might have very long data.
This data regularly works fine in csv or excel format though.
Given the above, are there options here? In the meantime I'm stuck basically excluding some data from my process.
k

Kevin Kho

02/09/2022, 2:47 PM
If the issue is a multiprocessing deadlock, you may have to either go to a LocalExecutor, or break up the work unfortunately. Michael may have other ideas
z

Zanie

02/09/2022, 3:15 PM
Oh if it didn’t work in the task either, it’s not a multiprocessing deadlock.
I’d report this as a bug to cloudpickle if you can make a minimal reproducible example, I don’t think pickle should hang like this.
You could attempt to serialize it another way. Perhaps by returning
<http://df.to|df.to>_json()
from your task instead of the raw data.
3 Views