Hi all, does anyone know of any condition that sho...
# ask-community
j
Hi all, does anyone know of any condition that should cause a flow to become totally stuck? This diagram seems to indicate that all the tasks upstream from this one are done, so I'd expect that this one (or another one with no pending upstream tasks) should start running. Currently there is only one flow run, and all tasks are either completed or pending. The flow run has been idle for 8 hours. Running Prefect 0.14.2.
k
Hi @Jeremy Phelps! Do the logs show anything?
What RunConfig and Executor are you using?
j
@Kevin Kho the logs show that everything ran normally until it was time for the currently pending tasks to run. Then the logs are silent.
I'm using a DaskExecutor. I see no evidence in the Dask server logs that the agent (a Kubernetes agent) ever submitted the task.
I don't know what a RunConfig is, so I must be using the default one.
k
RunConfig is like
ECSRun
or
KubernetesRun
j
Never heard of them.
k
There is a
UniversalRun
which is the default.
j
I don't even know what RunConfigs do.
k
Can I see how you setup the DaskExecutor?
Did this work successfully before today by the way? I think this hanging is potentially related to the DaskExecutor not having workers by the way
j
I have the following class:
Copy code
class OurFlow(PrefectFlow):
    def __init__(self, name, **kwargs):
        super().__init__(name, storage=Docker(registry_url='<http://gcr.io/pict-app|gcr.io/pict-app>',
                                              image_name=name,
                                              base_image='<http://gcr.io/pict-app/our-prefect|gcr.io/pict-app/our-prefect>',
                                              path=path_in_docker(inspect.stack()[1].filename),
                                              stored_as_script=True))
        self.executor=DaskExecutor(address='<tcp://dask-scheduler:8786>', debug=True)
All my flows are derived from it.
I'm pretty sure Dask workers are automatically handled by the Dask scheduler.
It wouldn't be very useful for flows to need to statically know which worker they'll run on.
PrefectFlow is defined as:
Copy code
from prefect import Flow as PrefectFlow
k
It is but we had this issue yesterday with KubernetesRun and DaskExecutor where the Flow was hanging until the maximum workers was defined
Looking into this
j
I can't see where "max workers" would be specified. The
___init___
method of DaskExecutor takes no such argument.
k
Might not be applicable to you but executor=DaskExecutor( cluster_class=“dask_kubernetes.KubeCluster”, cluster_kwargs={… }, adapt_kwargs={“maximum”: 10} )
j
That seems to be intended to be used to have the cluster provisioned dynamically somehow. When setting up Dask I did the simplest thing and just set up some permanent servers. One of them runs
dask-scheduler
and the others run
dask-worker
with the scheduler server's address specified.
Logs on the Dask workers show that the tasks that completed ran in Dask, but then Dask stopped receiving new work from Prefect.
k
Ok will bring this up to the team.
j
I'm going to try deleting the Kubernetes pod that the KubernetesAgent created. That should trigger Prefect's "Lazarus" mechanism. Maybe when it triggers, the other tasks will run.
Thanks, @Kevin Kho.
k
Nothing seems off from everything you initially posted. Deleting the pod and seeing what happens is a good way to go for now. No initial ideas for why this would occur. It could also be data movement within dask. Would you be able to check dask dashboard to see if the scheduler is freezing?
j
It does not appear that the scheduler is running its Web dashboard. There are a couple of warnings in the scheduler logs, but no errors.
k
Are you returning a large object? Could the cluster be resource starved?
j
The largest object that should have been returned so far would be an array of 200-300 integers.
The Lazarus mechanism has kicked in, but it didn't result in any tasks being launched.
k
Is there any activity on the scheduler log?
Ah I guess maybe not since the tasks didnt even kick off
j
I just noticed some young Kubernetes pods that have failed with errors (and managed not to be immediately deleted). Looks like the Kubernetes agent is running into this bug: https://github.com/PrefectHQ/prefect/issues/4471
The stack trace is different.
Looks like it suddenly stopped being possible to connect to the Prefect server from Kubernetes.
Or maybe not. I was able to run
prefect get flow-runs
inside Kubernetes.
The full stack trace:
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 573, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/cli/execute.py", line 49, in flow_run
    result = client.graphql(query)
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 304, in graphql
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 220, in post
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 446, in _request
    session=session, method=method, url=url, params=params, headers=headers
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 346, in _send_request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 504, in send
    raise ConnectTimeout(e, request=request)
requests.exceptions.ConnectTimeout: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))
I sent SIGINT to all the processes in the Kubernetes agent's pod, hoping to get a stack trace. Instead, I got more limited information:
Copy code
[2021-05-06 14:33:38-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:13:47-0500] INFO - prefect.DaskExecutor | Stopping executor, waiting for 1 active tasks to complete
[2021-05-06 15:14:12-0500] ERROR - prefect.CloudFlowRunner | Heartbeat process died with exit code 1
Is there any way to tell which "active task" might be referenced here?
The CLI tool provides no functionality when it comes to task runs.
k
My best suggestion would be to use the Prefect UI to try and see where the Flow failed?
j
I already looked there. According to the UI, all tasks have either succeeded or not yet started.
k
I see then is setting the agent log level to debug for a future run possible?
j
I suppose. How would I do that for the Kubernetes agent?
Killing the processes caused the flow run to fail, so I'm about to restart it anyway.
Found the option to set the log level.
k
You need an environment variable on the agent:
export PREFECT__LOGGING__LEVEL=DEBUG
j
Is
prefect agent kubernetes start --log-level DEBUG
not good enough?
k
Oh you’re right that’s way better.
j
I'm assuming that this will also set the log level to DEBUG inside Kubernetes, not just for the CLI tool that starts the pod.
k
Honestly not 100% sure, but this should be enough to see that active task on the Prefect logger side.
j
I'm adding
-e PREFECT__LOGGING__LEVEL=DEBUG
to the command line.
Copy code
[2021-05-06 15:33:11-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Using executor type DaskExecutor
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Flow 'demand-forecasting-delivery-scheduler': Handling state change from Scheduled to Running
[2021-05-06 15:33:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:41-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:56-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:11-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:42-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
There's nothing more specific than that. It took less than a second for the flow to reach its previous state.
k
Ok will bring this up to the team and get back to you.
j
The task that should be starting is a map:
Copy code
demand_forecast_status = demand_forecast_for_stockwell.map(unmapped(forecasting_job), stockwell_ids)
Both parameters are return values of tasks that have completed.
Here's another bit of info that might be relevant: I previously simulated all the infrastructure in Vagrant. In that environment, I was able to successfully run this flow. But there was one crucial difference: The
stockwell_ids
array had only one value in it (of type
int
), meaning only one task. Due to local memory constraints I had to keep it small. Now I'm running it in Google Cloud, and trying to map over hundreds of integers. Perhaps this has something to do with why things are failing?
k
Potentially since the scheduler going down makes it seem like it’s a Dask data transfer issue/ the cluster gets starved for resources. Limiting the number of ids definitely seems like a good things to try. I’m waiting for a response from the team for the other above items.
j
The scheduler does not go down.
k
Oh ok
j
The Dask cluster is AFAICT completely idle.
Have you heard anything back?
k
This really seems like a Dask issue more than a Prefect issue. Did reducing the ids help you?
The best advice we have is to try a similar Flow topology but with different tasks to isolate if the problem is related to that or the tasks themselves.
j
I have no idea why Prefect isn't running my tasks.
What alternate topology would be better, and why?
k
Not alternate. Replace the tasks with something else but still have the same map out behavior. We’re not expecting it to be better, we’re just trying to isolate the issue.
j
I see. I'll map a "hello world" task over the integers and see if it runs or not.
I'll give it 10 minutes for the Docker images to rebuild.
It ended up in the same state.
k
Ok. Will forward to team.
j
Here's a rough outline of the flow. It might be enough to reproduce the issue:
Copy code
@task
def stockwell_ids_for_metro(metro_id):
    return hundreds_of_integers()

from collections import namedtuple

@task
def create_job(tag):
    return namedtuple('JobId', ['tag'])(tag=tag)

@task
def hello_world(job, stockwell_id):
    prefect.context.get('logger').info('Called with {} and {}'.format(forecasting_job, stockwell_id))
    return stockwell_id

with OurFlow('freezing-flow') as flow:
    forecasting_job = create_job('1')
    stockwell_ids = stockwell_ids_for_metro(1)
    # hello_world doesn't run.
    hello_world.map(unmapped(forecasting_job), stockwell_ids)
👍 1
We also override the
@task
decorator:
Copy code
class OurTask(prefect.tasks.core.function.FunctionTask):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.is_mapped = False

    def map(self, *args, **kwargs):
        mapped_task = super().map(*args, **kwargs)
        mapped_task.is_mapped = True
        return mapped_task

def task(fn=None, log_stdout=True, **task_init_kwargs):
    if fn is None:
        return lambda fn: OurTask(fn=fn, log_stdout=log_stdout, **task_init_kwargs)
    return OurTask(fn=fn, **task_init_kwargs)
The purpose of this is to be able to dynamically tell if a given Task object represents a mapped task or not.
@Kevin Kho Was the team able to figure out this issue?
k
No unfortunately not 😞
Sorry about that
j
I found the cause: A syntax error in one of my modules went completely undetected. In Airflow, that would have caused an error that would've showed up in multiple places, but I have no idea where the error was reported (if it wasn't completely swallowed) in the Prefect/Kubernetes/Dask constellation.
k
Glad you figured it out! That’s weird. Would definitely expect an error to be thrown instead of hanging. Will bring it up to the team.
z
Hey @Jeremy Phelps -- we'd appreciate a minimal reproducible example so we can expose the error better. I imagine this has something to do with running your own dask cluster.
j
Surely, everyone runs their own Dask cluster, since that isn't part of the Prefect service.
z
Nope! Prefect will spin up the cluster for you if you don't connect to an existing one; we see people using dask executors local to their flow far more often than externally managed ones.
j
I found that not to be useful because doing anything locally means everything runs in one node. Dask has some ability to launch clusters in Kubernetes, but it seemed complicated to set up so I didn't try.
z
Everyone has their own way of managing their execution environment 🤷
If you can get me something reproducible I'm happy to dig into how the error was hidden