<@ULVA73B9P> I started getting the following error...
# marvin-ai
m
@Marvin I started getting the following error when running a flow. the flow crashes immediately on a cloud-based deployment, but I cannot reproduce this with a local run:
Copy code
Failed to start process for flow run '0690c7c2-2b6c-7986-8000-d4e4b7013289'.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1300, in _submit_run_and_capture_errors
    exit_code = await self._run_process(
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 781, in _run_process
    process = run_flow_in_subprocess(flow, flow_run=flow_run)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 1652, in run_flow_in_subprocess
    target=cloudpickle_wrapped_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/callables.py", line 224, in cloudpickle_wrapped_call
    payload = cloudpickle.dumps((__fn, args, kwargs))  # type: ignore   # no stubs available
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.RLock' object
m
thought for 282.6 seconds
Thanks for sharing the traceback — that’s a classic symptom of a non-picklable object being captured when Prefect spins up the flow run in a subprocess. What’s happening - In Prefect 3.x, deployments run your flow in a fresh subprocess for isolation. To start that subprocess, Prefect must serialize the callable and its arguments with cloudpickle. - Locally (e.g., running the script directly) your flow runs in-process, so no serialization is needed — which is why you don’t see the error there. - The error
TypeError: cannot pickle '_thread.RLock' object
means something referenced by your flow (or its parameters/defaults) includes a threading lock (RLock), which cloudpickle cannot serialize. Common culprits - Module-level singletons created at import time that contain locks: - Database engines/pools (e.g., SQLAlchemy
create_engine(...)
) - HTTP client sessions (e.g.,
requests.Session()
,
boto3.client(...)
,
s3fs.S3FileSystem(...)
) - Loggers/handlers cached in variables and then closed over by your flow - Any thread pool, scheduler, or client that internally uses locks - Closure variables: defining
@flow
inside another function or capturing outer-scope objects in the flow body - Default parameter values that are complex objects Where Prefect does this - Prefect launches flows via a subprocess that calls `cloudpickle_wrapped_call(...)`: - run_flow_in_subprocess source How to fix 1) Avoid capturing non-picklable objects in your flow’s closure - Ensure your
@flow
is defined at module top-level (not nested inside another function). - Don’t create heavy clients/engines at import-time and then reference them in the flow; instead, create them lazily inside tasks or inside the flow body. 2) Create clients/resources inside tasks or lazily at call time - Good pattern (clients created in the subprocess at runtime, not pickled):
Copy code
from functools import lru_cache
from prefect import flow, task
from sqlalchemy import create_engine

@lru_cache
def get_engine():
    # Created when first used in the subprocess
    return create_engine("postgresql+psycopg2://...")

@task
def do_db_work(...):
    engine = get_engine()
    with engine.connect() as conn:
        ...

@flow
def my_flow(...):
    do_db_work.submit(...)
- Similarly for
requests.Session()
,
boto3.client(...)
, etc.: make a
get_client()
function and call it inside tasks. 3) Do not pass complex objects as flow parameters - Keep flow parameters JSON-like (str, int, float, bool, dict/list of these). If you need a client/engine, reconstruct it inside the task. 4) Be careful with loggers - Avoid capturing a module-level
logging.getLogger(...)
in a closure. Inside flows/tasks prefer Prefect’s
get_run_logger()
and don’t store it globally. Quick ways to pinpoint the offender - Skim your flow module for any objects created at import time that could hold locks (DB engines, sessions, clients). - If you suspect a specific object, try pickling it directly in a Python shell:
Copy code
import cloudpickle
cloudpickle.dumps(suspect_object)  # should raise if it's the culprit
A couple of questions to help me tailor the fix: - Which infrastructure are you using for the deployment (Process worker, Docker, K8s, etc.)? - Do you create any DB engines, HTTP sessions, or SDK clients (boto3, redis, etc.) at import time? - Is your
@flow
defined at the top level of a module, or nested inside another function/class? - Are you passing any complex objects as flow parameters? If you can share a small snippet of your flow definition and any global objects you create/import, I can point out exactly what needs to move and how.
m
@Marvin here is the main flow that is triggered:
Copy code
import importlib
import inspect
import os
import subprocess
import sys
from datetime import datetime
from datetime import timedelta
from typing import Optional

import pandas as pd
import scrapy
import sentry_sdk
from prefect import flow
from prefect import get_run_logger
from prefect import task
from prefect.schedules import Interval

from app.core.config import settings
from app.mailers.scrape_email import send_daily_scrape_status_email
from app.prefect.tasks.expire_policy_task import run_policy_expiration_task
from app.prefect.tasks.generate_policies import run_policy_factory
from app.prefect.tasks.policy_diff_task import run_policy_diff_task
from app.prefect.tasks.policy_structured_data_extraction import extract_structured_data_from_policy


# gets a list of spider modules that can then be used to get each class.
# This allows for dynamically running all spiders
@task
def get_spider_modules():
    spider_modules = []
    spider_files = None
    spiders_path = None
    try:
        spiders_path = "/app/app/scraper/scraper/spiders"
        spider_files = os.listdir(spiders_path)
        print("spider_files", spider_files)
    except FileNotFoundError:
        spiders_path = "/app/app/scraper/scraper/spiders"
        spider_files = os.listdir(spiders_path)
    except Exception as e:
        print(f"Error getting spider modules: {e}")

    if spider_files is None:
        return []

    for filename in spider_files:
        if filename.endswith(".py") and filename != "__init__.py":
            spider_modules.append(filename[:-3])  # Remove .py extension
    # Clear out temp_json_files directory before running spiders
    temp_json_path = "/app/app/temp_json_files/"
    try:
        for file in os.listdir(temp_json_path):
            file_path = os.path.join(temp_json_path, file)
            if os.path.isfile(file_path):
                os.remove(file_path)
    except Exception as e:
        print(f"Error clearing temp_json_files directory: {e}")
    return spider_modules


# gets a list of spider names from the spider modules and ensures no parent classes are included
@task
def get_spider_classes(spider_modules, package_name):
    spider_names = []
    for module_name in spider_modules:
        try:
            module = importlib.import_module(f"{package_name}.{module_name}")
            for name, obj in inspect.getmembers(module):
                if (
                    inspect.isclass(obj)
                    and issubclass(obj, scrapy.Spider)
                    and obj is not scrapy.Spider
                    and not obj.__module__.startswith("app.scraper.scraper.classes")
                ):
                    # Get the spider name from the class attribute
                    spider_name = getattr(obj, "name", None)
                    if spider_name:
                        spider_names.append(spider_name)
        except (ModuleNotFoundError, AttributeError) as e:
            print(f"Error importing spider {module_name}: {e}")
            sentry_sdk.capture_exception(e)
    return spider_names


# this runs the spiders, which also upload to s3 currently
@task(
    name="run_spider",
    tags=[f"{settings.ENVIRONMENT}_external_scrape"],
    task_run_name="run_spider-{spider_name}",
    retries=0,
    retry_delay_seconds=60,
)
def run_spider(spider_name: str):
    logger = get_run_logger()
    command = f"scrapy crawl {spider_name}"

    try:
        proc = subprocess.Popen(
            command,
            cwd="/app/app/scraper",
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Redirect stderr to stdout
            shell=True,
            text=True,
            bufsize=1,
        )

        # Stream all output (both stdout and stderr) in real-time
        output_lines = []
        if proc.stdout:
            for line in proc.stdout:
                line_stripped = line.rstrip()
                logger.info(line_stripped)
                output_lines.append(line_stripped)

        proc.wait()

        # Handle errors with full diagnostic information
        if proc.returncode != 0:
            combined_output = "\n".join(output_lines)
            error_message = f"Spider {spider_name} failed with return code {proc.returncode}"

            if combined_output:
                error_message += f"\nOutput: {combined_output}"

            raise Exception(error_message)

        logger.info("runner has completed")
    except Exception as e:
        # Log the failure and return None so that the entire flow isn't killed
        # TODO: this is hacky and may mask issues with the spiders
        logger.error(f"Spider {spider_name} failed with error: {e}")
        sentry_sdk.capture_exception(e)
        return f"Spider {spider_name} had an error, but processing will continue: {e}"


@task
def clear_summarization_file():
    file_path = "/app/app/prefect/scrape_results.csv"
    if os.path.exists(file_path):
        os.remove(file_path)
    with open(file_path, "w") as f:
        f.write("date,spider_name,error_count,success_count\n")


@task
def trigger_scrape_status_email():
    df = pd.read_csv("/app/app/prefect/scrape_results.csv")
    df.sort_values(by="error_count", ascending=False, inplace=True)
    send_daily_scrape_status_email(df)


@flow(log_prints=True)
def scrape_v2(spider_names: Optional[str] = None):
    """Updated scrape flow to use prefect 3.0 features and async tasks"""

    try:
        clear_summarization_file_state = clear_summarization_file(return_state=True)
        clear_summarization_file_state.result()

        if spider_names:
            spider_modules = [spider_name.strip() for spider_name in spider_names.split(",")]
        else:
            state_get_spider_modules = get_spider_modules(return_state=True)
            spider_modules = state_get_spider_modules.result()

        state_spider_names = get_spider_classes(
            spider_modules, "app.scraper.scraper.spiders", return_state=True
        )
        spider_name_list = state_spider_names.result()

        spider_futures = [run_spider.submit(spider_name) for spider_name in spider_name_list]
        [future.result(raise_on_failure=False) for future in spider_futures]

        policy_factory_futures = []
        for spider_module in spider_modules:
            # Note - We are NOT passing the spider_futures to the run_policy_factory task
            # This is because we are already waiting for the futures to complete processing
            # and we want to attempt to run the policy factory even if a spider fails
            future = run_policy_factory.submit(spider_module)
            policy_factory_futures.append(future)

        # Wait for all tasks (spider + policy factory) before running the final tasks
        scrape_status_email_future = trigger_scrape_status_email.submit(
            wait_for=policy_factory_futures
        )  # type: ignore
        scrape_status_email_future.result()

        extract_structured_data_from_policy_futures = extract_structured_data_from_policy.submit(
            wait_for=policy_factory_futures
        )  # type: ignore
        extract_structured_data_from_policy_futures.result()

        policy_diff_future = run_policy_diff_task.submit(
            wait_for=extract_structured_data_from_policy_futures
        )  # type: ignore
        policy_diff_future.result()

        policy_expiration_future = run_policy_expiration_task.submit(
            wait_for=[policy_diff_future],
        )  # type: ignore
        policy_expiration_future.result()

    except Exception as e:
        # Capture exception with sentry and re-raise the exception
        sentry_sdk.capture_exception(e)
        raise


if __name__ == "__main__":
    spider_names = sys.argv[1] if len(sys.argv) > 1 else None

    if settings.ENVIRONMENT == "prod":
        schedules = [Interval(timedelta(days=1), datetime(2024, 1, 1, 5, 0), "America/New_York")]
    elif settings.ENVIRONMENT == "demo":
        schedules = [Interval(timedelta(weeks=1), datetime(2024, 1, 1, 9, 0), "America/New_York")]
    else:
        # To locally poll from prefect server:
        # 1. <https://app.prefect.cloud/account/7d5dffa0-829b-47ec-aa6a-abdb1c6d673d/workspace/77adba1d-5367-4f15-b66c-7af45d38e602/dashboard>
        # 2. Go to Deployments -> local_scraping (scrape-v2) -> Run -> Custom Run
        # -> Add in parameter for spider_name -> Submit
        schedules = []

    if os.environ.get("RUN_PREFECT_LOCALLY") == "true":
        # This allows for a quick local test without serving on prefect.
        # Ideal if you want to ensure that the job on prefect is using your local codebase.
        # And is not connecting to someone elses computer.
        print("Running local prefect job...")
        scrape_v2(spider_names=spider_names)
        print("Local prefect job completed, exiting.")
        sys.exit()

    scrape_v2.serve(
        name=f"{settings.ENVIRONMENT}_scraping",
        schedules=schedules,
        parameters={"spider_names": spider_names},
    )
and here is a task that is called:
Copy code
import logging

import sentry_sdk
from prefect import task
from sqlalchemy import func
from sqlalchemy import select
from sqlmodel import Session

from app.aws.s3 import get_document_type
from app.aws.s3 import get_policy_document
from app.core.config import settings
from app.core.db import engine
from app.core.sentry import configure_sentry
from app.data_scripts.generate_policy_diff import compare_policies
from app.models import Policy
from app.models import PolicyDiff
from app.utils import get_policy_text_from_authed_s3_link

logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
configure_sentry()


def query_new_policy_versions(session):
    """
    Returns a dict of parent_policy_id to their two most recent policy versions,
    for policies that need diffs created.

    Uses a window function to find policies with previous versions and checks
    if a diff already exists.
    """

    # Create CTE with window function to get previous policy ID
    # Partition by parent_policy_id, order by date_effective ASC (NULLS LAST to handle null dates)
    base_cte = (
        select(
            Policy.id,
            Policy.parent_policy_id,
            Policy.date_effective,
            Policy.date_expiration,
            func.lag(Policy.id)
            .over(
                partition_by=Policy.parent_policy_id,
                order_by=Policy.date_effective.asc().nulls_last(),
            )
            .label("previous_policy_id"),
        )
    ).cte("base")

    # Main query: select from base CTE, left join with PolicyDiff
    # Filter where previous_policy_id is not null and diff doesn't exist
    query = (
        select(base_cte)
        .outerjoin(
            PolicyDiff,
            PolicyDiff.current_policy_id == base_cte.c.id,
        )
        .where(base_cte.c.previous_policy_id.isnot(None))
        .where(PolicyDiff.current_policy_id.is_(None))
    )

    # Execute query and get policy IDs
    result = session.exec(query).all()
    policy_ids = [row.id for row in result]

    <http://logger.info|logger.info>("%d policies need diffs created", len(policy_ids))

    if not policy_ids:
        return {}

    # Fetch the actual Policy objects and their previous versions
    # Get current policies - use .scalars().all() to ensure we get Policy objects, not Row objects
    current_policies = session.exec(select(Policy).where(Policy.id.in_(policy_ids))).scalars().all()

    # Get previous policy IDs from the result
    policy_id_to_previous = {row.id: row.previous_policy_id for row in result}
    # Filter out None values - previous_policy_id can be None and we don't want to query for those
    previous_policy_ids = [
        policy_id for policy_id in set(policy_id_to_previous.values()) if policy_id is not None
    ]

    # Get previous policies
    if previous_policy_ids:
        previous_policies = (
            session.exec(select(Policy).where(Policy.id.in_(previous_policy_ids))).scalars().all()
        )
    else:
        previous_policies = []

    # Create a dict mapping previous_policy_id to Policy object
    previous_policy_map = {p.id: p for p in previous_policies}

    # Group policies by current_policy_id with current and previous versions
    # Use current_policy.id as key to avoid collisions when multiple policies
    # share the same parent_policy_id
    matched_policy_versions = {}

    for current_policy in current_policies:
        previous_policy_id = policy_id_to_previous[current_policy.id]
        previous_policy = previous_policy_map.get(previous_policy_id)

        if previous_policy:
            # Use current_policy.id as key to ensure uniqueness
            matched_policy_versions[current_policy.id] = {
                "current": current_policy.model_dump(),
                "previous": previous_policy.model_dump(),
            }

    <http://logger.info|logger.info>("%d policy pairs ready for diffing", len(matched_policy_versions.keys()))
    return matched_policy_versions


def fetch_authed_s3_urls(matched_policy_versions):
    """
    Fetches authed S3 URLs and document types for policy pairs. Removes policy pairs
    where URLs or document types cannot be retrieved (e.g., file doesn't exist).
    """
    policy_ids_to_remove = []
    for current_policy_id, policies in matched_policy_versions.items():
        for version_key in ["current", "previous"]:
            version = policies[version_key]
            authed_s3_url = get_policy_document(version["s3_file_name"])
            document_type = get_document_type(version["s3_file_name"])

            if authed_s3_url is None or document_type is None:
                logger.warning(
                    "Failed to get S3 URL or document type for policy_id %s, version %s",
                    current_policy_id,
                    version_key,
                )
                with sentry_sdk.push_scope() as scope:
                    scope.set_tag("task", "run_policy_diff_task")
                    scope.set_tag("environment", settings.ENVIRONMENT)
                    scope.set_tag("error_type", "s3_document_fetch_failure")
                    scope.set_extra("current_policy_id", str(current_policy_id))
                    scope.set_extra("s3_file_name", version["s3_file_name"])
                    scope.set_extra("authed_s3_url", authed_s3_url)
                    scope.set_extra("document_type", document_type)
                    sentry_sdk.capture_message(
                        (
                            f"S3 document fetch failed: Could not retrieve URL or "
                            f"document type for policy {current_policy_id} ({version_key})"
                        ),
                        level="warning",
                    )
                policy_ids_to_remove.append(current_policy_id)
                break

            version["authed_s3_url"] = authed_s3_url
            version["document_type"] = document_type

    # Remove policy pairs that couldn't get URLs/document types
    for policy_id in policy_ids_to_remove:
        matched_policy_versions.pop(policy_id, None)

    if policy_ids_to_remove:
        <http://logger.info|logger.info>(
            "Skipped %d policy pairs due to S3 URL/document type retrieval failures",
            len(policy_ids_to_remove),
        )

    return matched_policy_versions


def fetch_s3_documents(matched_policy_versions):
    """
    Fetches S3 documents for policy pairs. Removes policy pairs where documents
    cannot be fetched (e.g., 404 errors).
    """
    policy_ids_to_remove = []
    for current_policy_id, policies in matched_policy_versions.items():
        current_policy = policies["current"]
        previous_policy = policies["previous"]

        try:
            current_policy["document"] = get_policy_text_from_authed_s3_link(
                current_policy["authed_s3_url"],
                current_policy["document_type"],
                split_lines=True,
            )
        except Exception as e:
            logger.warning(
                "Failed to fetch current policy document for policy_id %s: %s",
                current_policy_id,
                str(e),
            )
            with sentry_sdk.push_scope() as scope:
                scope.set_tag("task", "run_policy_diff_task")
                scope.set_tag("environment", settings.ENVIRONMENT)
                scope.set_extra("current_policy_id", current_policy_id)
                sentry_sdk.capture_exception(e)
            policy_ids_to_remove.append(current_policy_id)
            continue

        try:
            previous_policy["document"] = get_policy_text_from_authed_s3_link(
                previous_policy["authed_s3_url"],
                previous_policy["document_type"],
                split_lines=True,
            )
        except Exception as e:
            logger.warning(
                "Failed to fetch previous policy document for policy_id %s: %s",
                current_policy_id,
                str(e),
            )
            with sentry_sdk.push_scope() as scope:
                scope.set_tag("task", "run_policy_diff_task")
                scope.set_tag("environment", settings.ENVIRONMENT)
                scope.set_extra("current_policy_id", current_policy_id)
                sentry_sdk.capture_exception(e)
            policy_ids_to_remove.append(current_policy_id)
            continue

    # Remove policy pairs that couldn't be fetched
    for policy_id in policy_ids_to_remove:
        matched_policy_versions.pop(policy_id, None)

    if policy_ids_to_remove:
        <http://logger.info|logger.info>(
            "Skipped %d policy pairs due to document fetch failures",
            len(policy_ids_to_remove),
        )

    return matched_policy_versions


def insert_policy_diff_into_db(session, policy_diff):
    try:
        # might need to allow for upsert in the future
        session.add(policy_diff)
        session.commit()
        return
    except Exception as e:
        current_policy_id = getattr(policy_diff, "current_policy_id", None)
        logger.exception("Error inserting policy diff into db for %s", current_policy_id)
        with sentry_sdk.push_scope() as scope:
            scope.set_tag("task", "run_policy_diff_task")
            scope.set_tag("environment", settings.ENVIRONMENT)
            scope.set_extra("current_policy_id", str(policy_diff.current_policy_id))
            scope.set_extra("previous_policy_id", str(policy_diff.previous_policy_id))
            sentry_sdk.capture_exception(e)
        return


@task
def run_policy_diff_task():
    """
    Diffs the two most recent policy versions for each parent policy that had at least one new
    version created today.

    This task:
    - Queries policies created today to identify affected parent_policy_ids.
    - Fetches the two most recent versions for each parent.
    - Extracts the policy text from their PDF S3 files.
    - Performs a diff using the extracted content.
    - Inserts the resulting diffs into the database.
    """

    with Session(engine) as session:
        matched_policy_versions = query_new_policy_versions(session)
        matched_policy_versions_with_s3_urls = fetch_authed_s3_urls(matched_policy_versions)
        matched_policy_versions_with_documents = fetch_s3_documents(
            matched_policy_versions_with_s3_urls
        )
        for current_policy_id, policies in matched_policy_versions_with_documents.items():
            <http://logger.info|logger.info>("Generating policy diff for policy_id %s", current_policy_id)
            print(current_policy_id)
            try:
                policy_diff = compare_policies(
                    policies["current"],
                    policies["previous"],
                )
                insert_policy_diff_into_db(session, policy_diff)
            except Exception as e:
                parent_policy_id = policies["current"].get("parent_policy_id")
                logger.exception(
                    "Error generating policy diff for current_policy_id %s", current_policy_id
                )
                with sentry_sdk.push_scope() as scope:
                    scope.set_tag("task", "run_policy_diff_task")
                    scope.set_tag("environment", settings.ENVIRONMENT)
                    scope.set_extra("current_policy_id", str(current_policy_id))
                    scope.set_extra("parent_policy_id", str(parent_policy_id))
                    sentry_sdk.capture_exception(e)


if __name__ == "__main__":
    run_policy_diff_task()
m
Your message was too long, here's your message at the allowed limit: here is the main flow that is triggered ```import importlib import inspect import os import subprocess import sys from datetime import datetime from datetime import timedelta from typing import Optional import pandas as pd import scrapy import sentry_sdk from prefect import flow from prefect import get_run_logger from prefect import task from prefect schedules import Interval from app core config import settings from app mailers scrape_email import send_daily_scrape_status_email from app prefect tasks expire_policy_task import run_policy_expiration_task from app prefect tasks generate_policies import run_policy_factory from app prefect tasks policy_diff_task import run_policy_diff_task from app prefect tasks policy_structured_data_extraction import extract_structured_data_from_policy # gets a list of spider modules that can then be used to get each class # This allows for dynamically running all spiders @task def get_spider_modules() spider_modules = [] spider_files = None spiders_path = None try spiders_path = /app/app/scraper/scraper/spiders spider_files = os listdir(spiders_path) print( spider_files spider_files) except FileNotFoundError spiders_path = /app/app/scraper/scraper/spiders spider_files = os listdir(spiders_path) except Exception as e print(f Error getting spider modules {e} ) if spider_files is None return [] for filename in spider_files if filename endswith( py ) and filename != init py spider_modules append(filename[ -3]) # Remove py extension # Clear out temp_json_files directory before running spiders temp_json_path = /app/app/temp_json_files/ try for file in os listdir(temp_json_path) file_path = os path join(temp_json_path file) if os path isfile(file_path) os remove(file_path) except Exception as e print(f Error clearing temp_json_files directory {e} ) return spider_modules # gets a list of spider names from the spider modules and ensures no parent classes are included @task def get_spider_classes(spider_modules package_name) spider_names = [] for module_name in spider_modules try module = importlib import_module(f {package_name} {module_name} ) for name obj in inspect getmembers(module) if ( inspect isclass(obj) and issubclass(obj scrapy Spider) and obj is not scrapy Spider and not obj module startswith( app scraper scraper classes ) ) # Get the spider name from the class attribute spider_name = getattr(obj name None) if spider_name spider_names append(spider_name) except (ModuleNotFoundError AttributeError) as e print(f Error importing spider {module_name} {e} ) sentry_sdk capture_exception(e) return spider_names # this runs the spiders which also upload to s3 currently @task( name= run_spider tags=[f {settings ENVIRONMENT}_external_scrape ] task_run_name= run_spider-{spider_name} retries=0 retry_delay_seconds=60 ) def run_spider(spider_name str) logger = get_run_logger() command = f scrapy crawl {spider_name} try proc = subprocess Popen( command cwd= /app/app/scraper stdout=subprocess PIPE stderr=subprocess STDOUT # Redirect stderr to stdout shell=True text=True bufsize=1 ) # Stream all output (both stdout and stderr) in real-time output_lines = [] if proc stdout for line in proc stdout line_stripped = line rstrip() logger info(line_stripped) output_lines append(line_stripped) proc wait() # Handle errors with full diagnostic information if proc returncode != 0 combined_output = \n join(output_lines) error_message = f Spider {spider_name} failed with return code {proc returncode} if combined_output error_message += f \nOutput {combined_output} raise Exception(error_message) logger info( runner has completed ) except Exception as e # Log the failure and return None so that the entire flow isn't killed # TODO this is hacky and may mask issues with the spiders logger error(f Spider {spider_name} failed with error {e}