<@ULVA73B9P> I'm using the Dask task runner, but I...
# ask-marvin
d
@Marvin I'm using the Dask task runner, but I've been facing an issue with pickling of my tasks. I've pinned down that the reason for this is becuase I have a failure_handler set up on some of my tasks that uses a boto3 client to SNS. Is there a way I can bypass this error while still using the failure handler?
m
thought for 987.2 seconds
You’re hitting Dask’s serialization (pickle) rules. When Prefect sends tasks to Dask workers, it cloudpickles the task object and anything it closes over (including
on_failure
hooks). A
boto3
SNS client is not picklable, so if your failure handler closes over a client (or you create it at import time), Dask will fail to serialize it. Good news: you can keep using
on_failure
with Dask by making the handler “pickle-safe”: What to do - Define the failure handler as a top-level function (not a lambda or nested function). - Don’t capture a
boto3
client in the closure. Instead, create the client inside the handler when it runs on the worker. - Only close over simple primitives (strings, numbers) if you need to parameterize the handler. - Prefer named functions over lambdas for better pickling reliability. Example: lazy-create the SNS client inside the handler
Copy code
import os
from prefect import task

def sns_failure_handler(task, task_run, state):
    # Import and create the client inside the handler — not at module import time
    import boto3
    client = boto3.client("sns", region_name=os.getenv("AWS_REGION", "us-east-1"))

    message = f"Task {task_run.name} failed: {state.message}"
    client.publish(TopicArn=os.environ["SNS_TOPIC_ARN"], Message=message)

@task(on_failure=[sns_failure_handler])
def my_task(...):
    ...
Parametrized handler that still pickles safely
Copy code
def make_sns_failure_handler(topic_arn: str, region_name: str | None = None):
    # Only close over simple strings
    def handler(task, task_run, state):
        import boto3
        client = boto3.client("sns", region_name=region_name)
        client.publish(TopicArn=topic_arn, Message=f"Task {task_run.name} failed: {state.message}")
    return handler

sns_handler = make_sns_failure_handler("arn:aws:sns:us-east-1:123456789012:my-topic", "us-east-1")

@task(on_failure=[sns_handler])
def my_task(...):
    ...
Alternative patterns - Do it inline and re-raise:
Copy code
@task
  def my_task():
      try:
          ...
      except Exception:
          import boto3, os, traceback
          boto3.client("sns", region_name=os.getenv("AWS_REGION")).publish(
              TopicArn=os.environ["SNS_TOPIC_ARN"],
              Message=traceback.format_exc(),
          )
          raise  # preserve failure
This also avoids pickling the client because it’s only created on the worker at runtime. - Server-side notifications with Automations: If you’d rather decouple notifications from task code, use Prefect Automations to trigger an action when a task-run fails. While Prefect has built-in notification blocks (Slack, PagerDuty, Twilio, SendGrid), it doesn’t have a built-in SNS block. Two SNS-friendly options: - “Run deployment” action that triggers a tiny “notify” flow that publishes to SNS via boto3. - “Call webhook” action to hit an endpoint you control (e.g., API Gateway) that then publishes to SNS. Relevant docs: - Blocks overview: Blocks - Notification blocks available: Blocks (notifications section) - Automations API reference: Automations API Quick sanity check - Ensure your handler function is defined at module scope. - Avoid lambdas and avoid capturing the
boto3
client in the handler closure. - Build the client inside the handler when it executes. If you can share a short snippet of your current handler and the exact traceback, I can confirm the minimal change needed.