I have noticed some unexpected behavior using the ...
# ask-community
k
I have noticed some unexpected behavior using the prefect AWSClientWait task on long running batch jobs. Sometimes the AWSClientWait task will have failed, and the schematic and run details show the task to have failed, but the flow state is hangs in the running state for hours after. This particular task is part of a mapped flow, and other mapped flows failed on the same task, yet their flow state is failed. Any idea why this might occur? The trigger rule for the task that follows the failed task is also all_successful so it shouldn't be waiting to submit another task
k
Hi @Kathryn Klarich, just making sure I understand right. Is the batch job still ongoing when the AWSClientwait task fails? How long is the batch job? Not quite understand the mapped flows failing on the same task, yet their flow state is failed bit. Are you saying the flow state is failed but it keeps running?
Oh sorry I re-read and get it more. It hangs for some of the mapped flows and it’s mark as failed for the other ones. I understand the situation now.
What Executor and RunConfig are you using?
k
yes that's correct. I am using the dask executor and docker storage / agent (which i am assuming will set up a docker run config) but i haven't actually specified this
the docker agent is just running locally on my computer, but the flow is registered to prefect cloud
k
Ok I think there’s a couple of things here. You can try running with boto3 logs on DEBUG so you can see if there’s anymore info. You can do this by adding the boto3 logger, and then set the
PREFECT___LOGGING___LEVEL
to
debug
in the
RunConfig
. I’ve also seen users experiencing Dask hangs move from threads to processes, and that helps sometimes.
I have seen another user experience issues with
boto3
so it would be helpful to get debug logs to see if it’s
prefect
failing or
boto3
failing.
k
I could be wrong but it seems to be a prefect error since the task is failed, but the flow thinks it's still running
k
Yes that part is and might be helped by moving from thread to processes in the DaskExecutor
k
ok cool, i'll try that
as well as the logging
k
Oh maybe ignore the logging. I thought there was some weird thing happening with the Waiting failing while the batch job was running
k
yeah it seems to be more an issue with the flow state being to failed
but good to know about the logging for future reference
@Kevin Kho as a follow up, i tried switching to processes instead of threads but that seems to mess up the parallel execution of tasks. see example flow below - if you remove scheduler="processes", all the mapped tasks will execute at once, but if you let it run as is, it seems like it's only running two mapped tasks at once
Copy code
import prefect
from prefect import Flow
from prefect.executors import LocalDaskExecutor
from time import sleep
from prefect import task


@task
def get_x():
    return [i for i in range(10)]


@task
def sleep_task(x: int):
    logger = prefect.context.get("logger")
    sleep_time = x + 10
    <http://logger.info|logger.info>(f"Sleeping for {sleep_time} seconds")
    sleep(sleep_time)


with Flow(name='my-flow', executor=LocalDaskExecutor(scheduler="processes")) as flow:
    sleep_task.map(get_x)
k
Do you still see this behavior if you bump the number of workers up?
LocalDaskExecutor(scheduler="processes", num_workers=8)
k
yes i do
k
Ok. In that case, could you add the DEBUG level logs to your Flow so that we can get more insight for the hanging running state?
How many mapped flows do you have? and just confirming that one of them is hanging?
k
I'm refactoring that flow right now to use nested mapping rather than subflow as i thought that might help
i'll add the debugging stuff as well, but yes it was just one that was hanging
i'm not sure how reproducible that behavior is
k
Sorry I meant how many items in the mapped operation? Just wanna get a sense for scale. What do you mean about nested mapping? Just making sure cuz some people run into problems with that.
k
in the test case where i saw the hanging behavior i think there were six mapped sub flows
k
Ok I see what you mean
k
the reason i was originally doing subflows was that I didn't realize prefect could do depth first execution
i thought maybe switching to mapping would help since there may be less state switching
k
Yes it probably will
k
cool - i'll let you know if it works, i'm running the flow right now
it completed successfully using the mapped tasks, going to try running it on a bigger mapping now
👍 1