Matthew Eng
11/06/2025, 11:12 AMFailed to start process for flow run '0690c7c2-2b6c-7986-8000-d4e4b7013289'.
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 1300, in _submit_run_and_capture_errors
exit_code = await self._run_process(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 781, in _run_process
process = run_flow_in_subprocess(flow, flow_run=flow_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 1652, in run_flow_in_subprocess
target=cloudpickle_wrapped_call(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/callables.py", line 224, in cloudpickle_wrapped_call
payload = cloudpickle.dumps((__fn, args, kwargs)) # type: ignore # no stubs available
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.RLock' objectMarvin
11/06/2025, 11:12 AMMarvin
11/06/2025, 11:17 AMTypeError: cannot pickle '_thread.RLock' object means something referenced by your flow (or its parameters/defaults) includes a threading lock (RLock), which cloudpickle cannot serialize.
Common culprits
- Module-level singletons created at import time that contain locks:
- Database engines/pools (e.g., SQLAlchemy create_engine(...))
- HTTP client sessions (e.g., requests.Session(), boto3.client(...), s3fs.S3FileSystem(...))
- Loggers/handlers cached in variables and then closed over by your flow
- Any thread pool, scheduler, or client that internally uses locks
- Closure variables: defining @flow inside another function or capturing outer-scope objects in the flow body
- Default parameter values that are complex objects
Where Prefect does this
- Prefect launches flows via a subprocess that calls `cloudpickle_wrapped_call(...)`:
- run_flow_in_subprocess source
How to fix
1) Avoid capturing non-picklable objects in your flow’s closure
- Ensure your @flow is defined at module top-level (not nested inside another function).
- Don’t create heavy clients/engines at import-time and then reference them in the flow; instead, create them lazily inside tasks or inside the flow body.
2) Create clients/resources inside tasks or lazily at call time
- Good pattern (clients created in the subprocess at runtime, not pickled):
from functools import lru_cache
from prefect import flow, task
from sqlalchemy import create_engine
@lru_cache
def get_engine():
# Created when first used in the subprocess
return create_engine("postgresql+psycopg2://...")
@task
def do_db_work(...):
engine = get_engine()
with engine.connect() as conn:
...
@flow
def my_flow(...):
do_db_work.submit(...)
- Similarly for requests.Session(), boto3.client(...), etc.: make a get_client() function and call it inside tasks.
3) Do not pass complex objects as flow parameters
- Keep flow parameters JSON-like (str, int, float, bool, dict/list of these). If you need a client/engine, reconstruct it inside the task.
4) Be careful with loggers
- Avoid capturing a module-level logging.getLogger(...) in a closure. Inside flows/tasks prefer Prefect’s get_run_logger() and don’t store it globally.
Quick ways to pinpoint the offender
- Skim your flow module for any objects created at import time that could hold locks (DB engines, sessions, clients).
- If you suspect a specific object, try pickling it directly in a Python shell:
import cloudpickle
cloudpickle.dumps(suspect_object) # should raise if it's the culprit
A couple of questions to help me tailor the fix:
- Which infrastructure are you using for the deployment (Process worker, Docker, K8s, etc.)?
- Do you create any DB engines, HTTP sessions, or SDK clients (boto3, redis, etc.) at import time?
- Is your @flow defined at the top level of a module, or nested inside another function/class?
- Are you passing any complex objects as flow parameters?
If you can share a small snippet of your flow definition and any global objects you create/import, I can point out exactly what needs to move and how.Matthew Eng
11/06/2025, 11:26 AMimport importlib
import inspect
import os
import subprocess
import sys
from datetime import datetime
from datetime import timedelta
from typing import Optional
import pandas as pd
import scrapy
import sentry_sdk
from prefect import flow
from prefect import get_run_logger
from prefect import task
from prefect.schedules import Interval
from app.core.config import settings
from app.mailers.scrape_email import send_daily_scrape_status_email
from app.prefect.tasks.expire_policy_task import run_policy_expiration_task
from app.prefect.tasks.generate_policies import run_policy_factory
from app.prefect.tasks.policy_diff_task import run_policy_diff_task
from app.prefect.tasks.policy_structured_data_extraction import extract_structured_data_from_policy
# gets a list of spider modules that can then be used to get each class.
# This allows for dynamically running all spiders
@task
def get_spider_modules():
spider_modules = []
spider_files = None
spiders_path = None
try:
spiders_path = "/app/app/scraper/scraper/spiders"
spider_files = os.listdir(spiders_path)
print("spider_files", spider_files)
except FileNotFoundError:
spiders_path = "/app/app/scraper/scraper/spiders"
spider_files = os.listdir(spiders_path)
except Exception as e:
print(f"Error getting spider modules: {e}")
if spider_files is None:
return []
for filename in spider_files:
if filename.endswith(".py") and filename != "__init__.py":
spider_modules.append(filename[:-3]) # Remove .py extension
# Clear out temp_json_files directory before running spiders
temp_json_path = "/app/app/temp_json_files/"
try:
for file in os.listdir(temp_json_path):
file_path = os.path.join(temp_json_path, file)
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error clearing temp_json_files directory: {e}")
return spider_modules
# gets a list of spider names from the spider modules and ensures no parent classes are included
@task
def get_spider_classes(spider_modules, package_name):
spider_names = []
for module_name in spider_modules:
try:
module = importlib.import_module(f"{package_name}.{module_name}")
for name, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, scrapy.Spider)
and obj is not scrapy.Spider
and not obj.__module__.startswith("app.scraper.scraper.classes")
):
# Get the spider name from the class attribute
spider_name = getattr(obj, "name", None)
if spider_name:
spider_names.append(spider_name)
except (ModuleNotFoundError, AttributeError) as e:
print(f"Error importing spider {module_name}: {e}")
sentry_sdk.capture_exception(e)
return spider_names
# this runs the spiders, which also upload to s3 currently
@task(
name="run_spider",
tags=[f"{settings.ENVIRONMENT}_external_scrape"],
task_run_name="run_spider-{spider_name}",
retries=0,
retry_delay_seconds=60,
)
def run_spider(spider_name: str):
logger = get_run_logger()
command = f"scrapy crawl {spider_name}"
try:
proc = subprocess.Popen(
command,
cwd="/app/app/scraper",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
shell=True,
text=True,
bufsize=1,
)
# Stream all output (both stdout and stderr) in real-time
output_lines = []
if proc.stdout:
for line in proc.stdout:
line_stripped = line.rstrip()
logger.info(line_stripped)
output_lines.append(line_stripped)
proc.wait()
# Handle errors with full diagnostic information
if proc.returncode != 0:
combined_output = "\n".join(output_lines)
error_message = f"Spider {spider_name} failed with return code {proc.returncode}"
if combined_output:
error_message += f"\nOutput: {combined_output}"
raise Exception(error_message)
logger.info("runner has completed")
except Exception as e:
# Log the failure and return None so that the entire flow isn't killed
# TODO: this is hacky and may mask issues with the spiders
logger.error(f"Spider {spider_name} failed with error: {e}")
sentry_sdk.capture_exception(e)
return f"Spider {spider_name} had an error, but processing will continue: {e}"
@task
def clear_summarization_file():
file_path = "/app/app/prefect/scrape_results.csv"
if os.path.exists(file_path):
os.remove(file_path)
with open(file_path, "w") as f:
f.write("date,spider_name,error_count,success_count\n")
@task
def trigger_scrape_status_email():
df = pd.read_csv("/app/app/prefect/scrape_results.csv")
df.sort_values(by="error_count", ascending=False, inplace=True)
send_daily_scrape_status_email(df)
@flow(log_prints=True)
def scrape_v2(spider_names: Optional[str] = None):
"""Updated scrape flow to use prefect 3.0 features and async tasks"""
try:
clear_summarization_file_state = clear_summarization_file(return_state=True)
clear_summarization_file_state.result()
if spider_names:
spider_modules = [spider_name.strip() for spider_name in spider_names.split(",")]
else:
state_get_spider_modules = get_spider_modules(return_state=True)
spider_modules = state_get_spider_modules.result()
state_spider_names = get_spider_classes(
spider_modules, "app.scraper.scraper.spiders", return_state=True
)
spider_name_list = state_spider_names.result()
spider_futures = [run_spider.submit(spider_name) for spider_name in spider_name_list]
[future.result(raise_on_failure=False) for future in spider_futures]
policy_factory_futures = []
for spider_module in spider_modules:
# Note - We are NOT passing the spider_futures to the run_policy_factory task
# This is because we are already waiting for the futures to complete processing
# and we want to attempt to run the policy factory even if a spider fails
future = run_policy_factory.submit(spider_module)
policy_factory_futures.append(future)
# Wait for all tasks (spider + policy factory) before running the final tasks
scrape_status_email_future = trigger_scrape_status_email.submit(
wait_for=policy_factory_futures
) # type: ignore
scrape_status_email_future.result()
extract_structured_data_from_policy_futures = extract_structured_data_from_policy.submit(
wait_for=policy_factory_futures
) # type: ignore
extract_structured_data_from_policy_futures.result()
policy_diff_future = run_policy_diff_task.submit(
wait_for=extract_structured_data_from_policy_futures
) # type: ignore
policy_diff_future.result()
policy_expiration_future = run_policy_expiration_task.submit(
wait_for=[policy_diff_future],
) # type: ignore
policy_expiration_future.result()
except Exception as e:
# Capture exception with sentry and re-raise the exception
sentry_sdk.capture_exception(e)
raise
if __name__ == "__main__":
spider_names = sys.argv[1] if len(sys.argv) > 1 else None
if settings.ENVIRONMENT == "prod":
schedules = [Interval(timedelta(days=1), datetime(2024, 1, 1, 5, 0), "America/New_York")]
elif settings.ENVIRONMENT == "demo":
schedules = [Interval(timedelta(weeks=1), datetime(2024, 1, 1, 9, 0), "America/New_York")]
else:
# To locally poll from prefect server:
# 1. <https://app.prefect.cloud/account/7d5dffa0-829b-47ec-aa6a-abdb1c6d673d/workspace/77adba1d-5367-4f15-b66c-7af45d38e602/dashboard>
# 2. Go to Deployments -> local_scraping (scrape-v2) -> Run -> Custom Run
# -> Add in parameter for spider_name -> Submit
schedules = []
if os.environ.get("RUN_PREFECT_LOCALLY") == "true":
# This allows for a quick local test without serving on prefect.
# Ideal if you want to ensure that the job on prefect is using your local codebase.
# And is not connecting to someone elses computer.
print("Running local prefect job...")
scrape_v2(spider_names=spider_names)
print("Local prefect job completed, exiting.")
sys.exit()
scrape_v2.serve(
name=f"{settings.ENVIRONMENT}_scraping",
schedules=schedules,
parameters={"spider_names": spider_names},
)Matthew Eng
11/06/2025, 11:26 AMimport logging
import sentry_sdk
from prefect import task
from sqlalchemy import func
from sqlalchemy import select
from sqlmodel import Session
from app.aws.s3 import get_document_type
from app.aws.s3 import get_policy_document
from app.core.config import settings
from app.core.db import engine
from app.core.sentry import configure_sentry
from app.data_scripts.generate_policy_diff import compare_policies
from app.models import Policy
from app.models import PolicyDiff
from app.utils import get_policy_text_from_authed_s3_link
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
configure_sentry()
def query_new_policy_versions(session):
"""
Returns a dict of parent_policy_id to their two most recent policy versions,
for policies that need diffs created.
Uses a window function to find policies with previous versions and checks
if a diff already exists.
"""
# Create CTE with window function to get previous policy ID
# Partition by parent_policy_id, order by date_effective ASC (NULLS LAST to handle null dates)
base_cte = (
select(
Policy.id,
Policy.parent_policy_id,
Policy.date_effective,
Policy.date_expiration,
func.lag(Policy.id)
.over(
partition_by=Policy.parent_policy_id,
order_by=Policy.date_effective.asc().nulls_last(),
)
.label("previous_policy_id"),
)
).cte("base")
# Main query: select from base CTE, left join with PolicyDiff
# Filter where previous_policy_id is not null and diff doesn't exist
query = (
select(base_cte)
.outerjoin(
PolicyDiff,
PolicyDiff.current_policy_id == base_cte.c.id,
)
.where(base_cte.c.previous_policy_id.isnot(None))
.where(PolicyDiff.current_policy_id.is_(None))
)
# Execute query and get policy IDs
result = session.exec(query).all()
policy_ids = [row.id for row in result]
<http://logger.info|logger.info>("%d policies need diffs created", len(policy_ids))
if not policy_ids:
return {}
# Fetch the actual Policy objects and their previous versions
# Get current policies - use .scalars().all() to ensure we get Policy objects, not Row objects
current_policies = session.exec(select(Policy).where(Policy.id.in_(policy_ids))).scalars().all()
# Get previous policy IDs from the result
policy_id_to_previous = {row.id: row.previous_policy_id for row in result}
# Filter out None values - previous_policy_id can be None and we don't want to query for those
previous_policy_ids = [
policy_id for policy_id in set(policy_id_to_previous.values()) if policy_id is not None
]
# Get previous policies
if previous_policy_ids:
previous_policies = (
session.exec(select(Policy).where(Policy.id.in_(previous_policy_ids))).scalars().all()
)
else:
previous_policies = []
# Create a dict mapping previous_policy_id to Policy object
previous_policy_map = {p.id: p for p in previous_policies}
# Group policies by current_policy_id with current and previous versions
# Use current_policy.id as key to avoid collisions when multiple policies
# share the same parent_policy_id
matched_policy_versions = {}
for current_policy in current_policies:
previous_policy_id = policy_id_to_previous[current_policy.id]
previous_policy = previous_policy_map.get(previous_policy_id)
if previous_policy:
# Use current_policy.id as key to ensure uniqueness
matched_policy_versions[current_policy.id] = {
"current": current_policy.model_dump(),
"previous": previous_policy.model_dump(),
}
<http://logger.info|logger.info>("%d policy pairs ready for diffing", len(matched_policy_versions.keys()))
return matched_policy_versions
def fetch_authed_s3_urls(matched_policy_versions):
"""
Fetches authed S3 URLs and document types for policy pairs. Removes policy pairs
where URLs or document types cannot be retrieved (e.g., file doesn't exist).
"""
policy_ids_to_remove = []
for current_policy_id, policies in matched_policy_versions.items():
for version_key in ["current", "previous"]:
version = policies[version_key]
authed_s3_url = get_policy_document(version["s3_file_name"])
document_type = get_document_type(version["s3_file_name"])
if authed_s3_url is None or document_type is None:
logger.warning(
"Failed to get S3 URL or document type for policy_id %s, version %s",
current_policy_id,
version_key,
)
with sentry_sdk.push_scope() as scope:
scope.set_tag("task", "run_policy_diff_task")
scope.set_tag("environment", settings.ENVIRONMENT)
scope.set_tag("error_type", "s3_document_fetch_failure")
scope.set_extra("current_policy_id", str(current_policy_id))
scope.set_extra("s3_file_name", version["s3_file_name"])
scope.set_extra("authed_s3_url", authed_s3_url)
scope.set_extra("document_type", document_type)
sentry_sdk.capture_message(
(
f"S3 document fetch failed: Could not retrieve URL or "
f"document type for policy {current_policy_id} ({version_key})"
),
level="warning",
)
policy_ids_to_remove.append(current_policy_id)
break
version["authed_s3_url"] = authed_s3_url
version["document_type"] = document_type
# Remove policy pairs that couldn't get URLs/document types
for policy_id in policy_ids_to_remove:
matched_policy_versions.pop(policy_id, None)
if policy_ids_to_remove:
<http://logger.info|logger.info>(
"Skipped %d policy pairs due to S3 URL/document type retrieval failures",
len(policy_ids_to_remove),
)
return matched_policy_versions
def fetch_s3_documents(matched_policy_versions):
"""
Fetches S3 documents for policy pairs. Removes policy pairs where documents
cannot be fetched (e.g., 404 errors).
"""
policy_ids_to_remove = []
for current_policy_id, policies in matched_policy_versions.items():
current_policy = policies["current"]
previous_policy = policies["previous"]
try:
current_policy["document"] = get_policy_text_from_authed_s3_link(
current_policy["authed_s3_url"],
current_policy["document_type"],
split_lines=True,
)
except Exception as e:
logger.warning(
"Failed to fetch current policy document for policy_id %s: %s",
current_policy_id,
str(e),
)
with sentry_sdk.push_scope() as scope:
scope.set_tag("task", "run_policy_diff_task")
scope.set_tag("environment", settings.ENVIRONMENT)
scope.set_extra("current_policy_id", current_policy_id)
sentry_sdk.capture_exception(e)
policy_ids_to_remove.append(current_policy_id)
continue
try:
previous_policy["document"] = get_policy_text_from_authed_s3_link(
previous_policy["authed_s3_url"],
previous_policy["document_type"],
split_lines=True,
)
except Exception as e:
logger.warning(
"Failed to fetch previous policy document for policy_id %s: %s",
current_policy_id,
str(e),
)
with sentry_sdk.push_scope() as scope:
scope.set_tag("task", "run_policy_diff_task")
scope.set_tag("environment", settings.ENVIRONMENT)
scope.set_extra("current_policy_id", current_policy_id)
sentry_sdk.capture_exception(e)
policy_ids_to_remove.append(current_policy_id)
continue
# Remove policy pairs that couldn't be fetched
for policy_id in policy_ids_to_remove:
matched_policy_versions.pop(policy_id, None)
if policy_ids_to_remove:
<http://logger.info|logger.info>(
"Skipped %d policy pairs due to document fetch failures",
len(policy_ids_to_remove),
)
return matched_policy_versions
def insert_policy_diff_into_db(session, policy_diff):
try:
# might need to allow for upsert in the future
session.add(policy_diff)
session.commit()
return
except Exception as e:
current_policy_id = getattr(policy_diff, "current_policy_id", None)
logger.exception("Error inserting policy diff into db for %s", current_policy_id)
with sentry_sdk.push_scope() as scope:
scope.set_tag("task", "run_policy_diff_task")
scope.set_tag("environment", settings.ENVIRONMENT)
scope.set_extra("current_policy_id", str(policy_diff.current_policy_id))
scope.set_extra("previous_policy_id", str(policy_diff.previous_policy_id))
sentry_sdk.capture_exception(e)
return
@task
def run_policy_diff_task():
"""
Diffs the two most recent policy versions for each parent policy that had at least one new
version created today.
This task:
- Queries policies created today to identify affected parent_policy_ids.
- Fetches the two most recent versions for each parent.
- Extracts the policy text from their PDF S3 files.
- Performs a diff using the extracted content.
- Inserts the resulting diffs into the database.
"""
with Session(engine) as session:
matched_policy_versions = query_new_policy_versions(session)
matched_policy_versions_with_s3_urls = fetch_authed_s3_urls(matched_policy_versions)
matched_policy_versions_with_documents = fetch_s3_documents(
matched_policy_versions_with_s3_urls
)
for current_policy_id, policies in matched_policy_versions_with_documents.items():
<http://logger.info|logger.info>("Generating policy diff for policy_id %s", current_policy_id)
print(current_policy_id)
try:
policy_diff = compare_policies(
policies["current"],
policies["previous"],
)
insert_policy_diff_into_db(session, policy_diff)
except Exception as e:
parent_policy_id = policies["current"].get("parent_policy_id")
logger.exception(
"Error generating policy diff for current_policy_id %s", current_policy_id
)
with sentry_sdk.push_scope() as scope:
scope.set_tag("task", "run_policy_diff_task")
scope.set_tag("environment", settings.ENVIRONMENT)
scope.set_extra("current_policy_id", str(current_policy_id))
scope.set_extra("parent_policy_id", str(parent_policy_id))
sentry_sdk.capture_exception(e)
if __name__ == "__main__":
run_policy_diff_task()Marvin
11/06/2025, 11:27 AM