Ben T
08/06/2025, 6:08 PMMarvin
08/06/2025, 6:08 PMMarvin
08/06/2025, 6:09 PMpython
from prefect import task, flow
@task(
    retries=3,  # Number of retries
    retry_delay_seconds=10  # Delay between retries
)
async def your_task():
    # Your task code here
    passpython
from prefect import task, flow
from prefect.tasks import exponential_backoff
@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=2),  # Will wait 2, 4, 8 seconds
)
async def your_task():
    # Your task code here
    pass
@flow
async def your_flow():
    results = await your_task()
    return resultspython
from prefect import task, flow
def should_retry(exception: Exception) -> bool:
    # Retry specifically for the TaskRun state error
    return isinstance(exception, AttributeError) and "TaskRun" in str(exception)
@task(
    retries=3,
    retry_delay_seconds=10,
    retry_condition_fn=should_retry
)
async def your_task():
    # Your task code here
    passpython
@flow
async def your_flow():
    tasks = []
    for i in range(100):
        task = your_task.submit()
        tasks.append(task)
        await asyncio.sleep(0.1)  # Add small delay between submissions
    
    results = [await t.result() for t in tasks]Ben T
08/06/2025, 6:10 PMMarvin
08/06/2025, 6:10 PMMarvin
08/06/2025, 6:11 PMpython
from prefect import flow, task
import asyncio
from functools import partial
from asyncio import Semaphore
@task(retries=3, retry_delay_seconds=10)
async def your_task():
    # your task code here
    pass
@flow
async def your_flow():
    # Limit concurrent task submissions
    sem = Semaphore(10)  # Adjust this number based on your needs
    
    async def submit_with_semaphore():
        async with sem:
            return await your_task.submit()
    
    # Create all task futures
    tasks = [submit_with_semaphore() for _ in range(100)]
    
    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    return resultspython
@flow
async def your_flow():
    batch_size = 10
    all_results = []
    
    # Process in batches
    for i in range(0, 100, batch_size):
        batch_tasks = [your_task.submit() for _ in range(batch_size)]
        batch_results = await asyncio.gather(*batch_tasks)
        all_results.extend(batch_results)
        
        # Optional: Add small delay between batches
        await asyncio.sleep(1)
    
    return all_resultspython
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
    retries=3,
    retry_delay_seconds=10,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(minutes=10)  # Add caching to help with retries
)
async def your_task(i: int):
    # your task code here
    pass
@flow
async def your_flow():
    async with anyio.create_task_group() as tg:
        tasks = []
        for i in range(100):
            task = tg.start_soon(your_task, i)
            tasks.append(task)
            await asyncio.sleep(0.1)  # Small delay between submissions
    
    return taskspython
from prefect import flow, task
from prefect.utilities.annotations import quote
@task(retries=3, retry_delay_seconds=10)
async def your_task():
    # your task code here
    pass
@flow(name="error_boundary_flow")
async def error_boundary_flow(task_fn):
    return await task_fn()
@flow
async def main_flow():
    tasks = []
    for _ in range(100):
        # Wrap each task submission in an error boundary
        task = await error_boundary_flow(quote(your_task))
        tasks.append(task)
    return taskspython
import logging
@task(retries=3, retry_delay_seconds=10)
async def your_task():
    <http://logging.info|logging.info>("Starting task execution")
    try:
        # your task code here
        pass
    except Exception as e:
        logging.error(f"Task failed with error: {str(e)}")
        raiseBen T
08/06/2025, 6:12 PMMarvin
08/06/2025, 6:12 PMMarvin
08/06/2025, 6:12 PMpython
from prefect import flow
@flow(
    retries=3,  # Number of retries
    retry_delay_seconds=10  # Delay between retries
)
async def your_flow():
    # Your flow code here
    tasks = [your_task.submit() for _ in range(100)]
    results = await asyncio.gather(*tasks)
    return resultspython
from prefect import flow
from datetime import timedelta
def should_retry_flow(exception: Exception) -> bool:
    # Retry specifically for the TaskRun state error
    return isinstance(exception, AttributeError) and "TaskRun" in str(exception)
@flow(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=2),  # Will wait 2, 4, 8 seconds
    retry_condition_fn=should_retry_flow
)
async def your_flow():
    # Your flow code here
    passpython
from prefect import flow
from contextlib import asynccontextmanager
@asynccontextmanager
async def flow_error_handler():
    try:
        yield
    except Exception as e:
        print(f"Flow failed with error: {str(e)}")
        raise
@flow(retries=3, retry_delay_seconds=10)
async def your_flow():
    async with flow_error_handler():
        tasks = [your_task.submit() for _ in range(100)]
        results = await asyncio.gather(*tasks)
        return resultsBen T
08/06/2025, 6:14 PMMarvin
08/06/2025, 6:14 PMMarvin
08/06/2025, 6:15 PMnamepython
@task(name="extract_customer_data")  # Clear purpose in logs/UI
async def get_data_from_api():
    # vs. default name which would be "get_data_from_api"
    passpython
@task(name="process_user_123")
def process_user(user_id: int):
    # Error messages will show "process_user_123 failed" instead of "process_user failed"
    passpython
@task(name="extract_data_v2")
def get_data():
    # If you change the function name later, logs/history still show consistent task name
    passpython
@task(name="validate_email_address")
def validate(input_str: str):
    # More clear than just "validate" when used in different contexts
    passpython
from prefect import task, flow
# Good: When task has multiple instances
@task(name="process_customer_segment_A")
def process_customers(segment: str):
    pass
# Good: When function name is generic
@task(name="extract_salesforce_leads")
def extract_data():
    pass
# Unnecessary: When function name is already clear
@task
def calculate_monthly_revenue():
    pass
# Good: When versioning matters
@task(name="transform_data_v2")
def transform_data():
    pass
@flow
def main_flow():
    # Now logs and UI will show meaningful task names
    segment_a = process_customers("A")
    leads = extract_data()
    revenue = calculate_monthly_revenue()Ben T
08/06/2025, 6:17 PMMarvin
08/06/2025, 6:17 PMMarvin
08/06/2025, 6:20 PMpython
from prefect import flow
from prefect.runtime import flow_run
from datetime import datetime
def generate_flow_run_name() -> str:
    # Access the flow's name from runtime context
    flow_name = flow_run.flow_name
    timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
    return f"{flow_name}-{timestamp}"
@flow(name="my-special-flow", flow_run_name=generate_flow_run_name)
def my_flow():
    print(f"Current flow run name: {flow_run.flow_run_name}")python
from prefect import flow
from typing import Optional
def generate_flow_run_name(environment: Optional[str] = None) -> str:
    base_name = flow_run.flow_name
    env_suffix = f"-{environment}" if environment else ""
    return f"{base_name}{env_suffix}-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
@flow(
    name="data-pipeline",
    flow_run_name=generate_flow_run_name
)
def process_data(environment: str = "dev"):
    print(f"Running in {environment}")python
from prefect import flow
from datetime import datetime
from typing import Callable, Optional, Dict, Any
def create_flow_run_namer(
    prefix: Optional[str] = None,
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-"
) -> Callable:
    def generate_name(parameters: Dict[str, Any] = None) -> str:
        components = []
        
        # Add prefix if specified, otherwise use flow name
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        
        # Add any relevant parameters
        if parameters:
            for key in sorted(parameters.keys()):
                components.append(f"{key}_{parameters[key]}")
        
        # Add timestamp
        components.append(datetime.utcnow().strftime(date_format))
        
        return separator.join(components)
    
    return generate_name
@flow(
    name="etl-pipeline",
    flow_run_name=create_flow_run_namer(prefix="ETL", separator="_")
)
def etl_flow(source: str, destination: str):
    print(f"Current flow run name: {flow_run.flow_run_name}")
# Usage:
# etl_flow("postgres", "snowflake")
# Generates names like: ETL_source_postgres_destination_snowflake_20230815-123456python
from prefect import flow
import platform
import sys
def create_versioned_flow_namer():
    def generate_name() -> str:
        components = [
            flow_run.flow_name,
            f"py{sys.version_info.major}.{sys.version_info.minor}",
            platform.system().lower(),
            datetime.utcnow().strftime("%Y%m%d-%H%M%S")
        ]
        return "-".join(components)
    return generate_name
@flow(
    name="ml-training",
    flow_run_name=create_versioned_flow_namer()
)
def train_model():
    # This will create names like: ml-training-py3.9-darwin-20230815-123456
    print(f"Running as: {flow_run.flow_run_name}")python
from prefect import flow
from prefect.logging import get_run_logger
def safe_flow_run_namer() -> str:
    logger = get_run_logger()
    try:
        base_name = flow_run.flow_name
        timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
        return f"{base_name}-{timestamp}"
    except Exception as e:
        logger.warning(f"Error generating flow run name: {e}")
        return f"fallback-flow-{timestamp}"
@flow(name="robust-flow", flow_run_name=safe_flow_run_namer)
def my_robust_flow():
    passflow_run.flow_nameMarvin
08/06/2025, 6:20 PMBen T
08/06/2025, 6:48 PM@flow(
    name="build_analytics_tables",
    retries=3,
    retry_delay_seconds=10
)Marvin
08/06/2025, 6:48 PMMarvin
08/06/2025, 6:48 PMflow_run.flow_namepython
from prefect import flow
from prefect.runtime import flow_run
from datetime import datetime
def generate_analytics_run_name() -> str:
    # This will be "build_analytics_tables"
    flow_name = flow_run.flow_name
    timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
    return f"{flow_name}-{timestamp}"
@flow(
    name="build_analytics_tables",
    retries=3,
    retry_delay_seconds=10,
    flow_run_name=generate_analytics_run_name
)
def analytics_flow():
    print(f"Current flow name: {flow_run.flow_name}")  # Will print: "build_analytics_tables"
    print(f"Current flow run name: {flow_run.flow_run_name}")  # Will print something like: "build_analytics_tables-20230815-123456"build_analytics_tables-20230815-123456build_analytics_tables-20230815-123457flow_run.flow_namename@flowanalytics_flowBen T
08/06/2025, 6:50 PMMarvin
08/06/2025, 6:50 PMMarvin
08/06/2025, 6:50 PMcreate_flow_run_namerprefixdate_formatseparatorflow_run_name@flowpython
# Outer function - This is our configuration layer
def create_flow_run_namer(
    prefix: Optional[str] = None,
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-"
) -> Callable:
    # Inner function - This is what Prefect actually calls
    def generate_name(parameters: Dict[str, Any] = None) -> str:
        # This inner function has access to prefix, date_format, and separator
        # from the outer function's scope - this is called a "closure"
        components = []
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        # ... rest of the logic ...
        return separator.join(components)
    
    return generate_name
# Usage example:
@flow(
    name="my-flow",
    # Here we're calling the outer function to get a configured name generator
    flow_run_name=create_flow_run_namer(prefix="PROD", separator="_")
)
def my_flow():
    passpython
# Without nesting - more rigid and less configurable
def generate_name(parameters: Dict[str, Any] = None) -> str:
    # Now these values are hardcoded or would need to be global variables
    prefix = "PROD"
    date_format = "%Y%m%d-%H%M%S"
    separator = "-"
    
    components = []
    name_prefix = prefix or flow_run.flow_name
    components.append(name_prefix)
    # ... rest of the logic ...
    return separator.join(components)
@flow(name="my-flow", flow_run_name=generate_name)
def my_flow():
    passpython
# Example of why closures are useful
def create_greeting(greeting_word: str):
    def greet(name: str):
        return f"{greeting_word}, {name}!"
    return greet
# Create two different greeting functions
say_hello = create_greeting("Hello")
say_hi = create_greeting("Hi")
print(say_hello("Alice"))  # "Hello, Alice!"
print(say_hi("Alice"))     # "Hi, Alice!"python
from prefect import flow
from datetime import datetime
from typing import Optional, Dict, Any
def create_analytics_namer(
    environment: str = "prod",
    include_date: bool = True
):
    def generate_name(parameters: Dict[str, Any] = None) -> str:
        components = [flow_run.flow_name, environment]
        
        # Add any table names if they're in the parameters
        if parameters and "tables" in parameters:
            components.append(f"tables_{parameters['tables']}")
            
        # Optionally add date
        if include_date:
            components.append(datetime.utcnow().strftime("%Y%m%d"))
            
        return "-".join(components)
    
    return generate_name
@flow(
    name="build_analytics_tables",
    flow_run_name=create_analytics_namer(environment="staging", include_date=True)
)
def build_tables(tables: list):
    # This will create names like: build_analytics_tables-staging-tables_['users','orders']-20230815
    passflow_run_nameBen T
08/06/2025, 7:13 PMMarvin
08/06/2025, 7:13 PMMarvin
08/06/2025, 7:15 PMprefect.runtime.flow_run.flow_nameflow_run.flow_name@flowget_run_context().flow_run.namepython
from prefect import flow
from prefect.runtime import flow_run
from datetime import datetime
def generate_run_name() -> str:
    # This gets the static flow name ("analytics-pipeline")
    base_name = flow_run.flow_name
    
    # This would get the dynamic run name (might be "analytics-pipeline-20230815-123456")
    # current_run_name = get_run_context().flow_run.name
    
    timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
    return f"{base_name}-{timestamp}"
@flow(
    name="analytics-pipeline",
    flow_run_name=generate_run_name
)
def my_analytics_flow():
    print(f"Flow name: {flow_run.flow_name}")  # Will print: "analytics-pipeline"
    print(f"Flow run name: {flow_run.name}")   # Will print: "analytics-pipeline-20230815-123456"python
from prefect import flow
from prefect.runtime import flow_run
from datetime import datetime
from typing import Optional, Dict, Any
def create_analytics_namer(environment: str = "prod"):
    def generate_name(parameters: Dict[str, Any] = None) -> str:
        # Use flow_run.flow_name to get the static name from the decorator
        base_name = flow_run.flow_name
        
        components = [
            base_name,
            environment,
            datetime.utcnow().strftime("%Y%m%d-%H%M%S")
        ]
        
        if parameters and "table_name" in parameters:
            components.insert(1, parameters["table_name"])
            
        return "-".join(components)
    
    return generate_name
@flow(
    name="build_analytics_tables",
    flow_run_name=create_analytics_namer(environment="staging")
)
def build_table(table_name: str):
    print(f"Static flow name: {flow_run.flow_name}")
    # Will print: "build_analytics_tables"
    
    print(f"Dynamic flow run name: {flow_run.name}")
    # Will print something like: "build_analytics_tables-users-staging-20230815-123456"flow_run.flow_nameflow_run.nameget_run_context()prefect.runtimeBen T
08/06/2025, 7:17 PMdef generate_flow_run_name(
        date_format: str = "%Y%m%d-%H%M%S",
        separator: str = "-",
        prefix: Optional[str] = None,
        parameters: Optional[dict[str, Any]] = None
) -> Callable[[], str]:
    """
    Creates a flow run name generator for Prefect flows.
    The generated function builds a name by combining:
      - A prefix (either provided or from `flow_run.flow_name`)
      - Any provided parameters as `key_value` pairs sorted by key
      - A UTC timestamp in the given format
    :param date_format: A `strftime`-compatible format string for the UTC timestamp. Defaults to "%Y%m%d-%H%M%S".
    :param separator: String used to join the name components. Defaults to "-".
    :param prefix: Optional static prefix for the flow run name. If None, the flow name (`flow_run.flow_name`) will be used.
    :param parameters: Optional dictionary of parameters to include in the run name.
    :returns: A zero-argument callable that returns the generated flow run name as a string.
    """
    def generate_name() -> str:
        components: list[str] = []
        # Use explicit prefix if provided, otherwise the flow's name
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        # Add parameters in a stable order
        if parameters:
            for key in sorted(parameters.keys()):
                components.append(f"{key}_{parameters[key]}")
        # Always add timestamp
        components.append(datetime.datetime.now(datetime.UTC).strftime(date_format))
        return separator.join(components)
    return generate_nameMarvin
08/06/2025, 7:17 PMMarvin
08/06/2025, 7:18 PMpython
def generate_flow_run_name(
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-",
    prefix: Optional[str] = None,
) -> Callable[[Optional[Dict[str, Any]]], str]:  # Note the signature change
    """
    Creates a flow run name generator for Prefect flows.
    :param date_format: A `strftime`-compatible format string for the UTC timestamp. Defaults to "%Y%m%d-%H%M%S".
    :param separator: String used to join the name components. Defaults to "-".
    :param prefix: Optional static prefix for the flow run name. If None, the flow name (`flow_run.flow_name`) will be used.
    :returns: A callable that accepts an optional parameters dict and returns the generated flow run name as a string.
    """
    def generate_name(parameters: Optional[Dict[str, Any]] = None) -> str:  # Prefect passes parameters here
        components: list[str] = []
        # Use explicit prefix if provided, otherwise the flow's name
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        # Add parameters in a stable order
        if parameters:
            for key in sorted(parameters.keys()):
                components.append(f"{key}_{parameters[key]}")
        # Always add timestamp
        components.append(datetime.datetime.now(datetime.UTC).strftime(date_format))
        return separator.join(components)
    return generate_namepython
def generate_flow_run_name(
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-",
    prefix: Optional[str] = None,
) -> Callable[[Optional[Dict[str, Any]]], str]:
    def generate_name(parameters: Optional[Dict[str, Any]] = None) -> str:
        components: list[str] = []
        # Use explicit prefix if provided, otherwise the flow's name
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        # Add parameters in a stable order
        if parameters:
            for key in sorted(parameters.keys()):
                # Safe string conversion and cleanup
                value = str(parameters[key]).replace(" ", "").replace(separator, "_")
                components.append(f"{key}_{value}")
        # Always add timestamp
        components.append(datetime.datetime.now(datetime.UTC).strftime(date_format))
        return separator.join(components)
    return generate_namepython
@flow(
    name="analytics-pipeline",
    flow_run_name=generate_flow_run_name(
        prefix="ANALYTICS",
        date_format="%Y%m%d",
        separator="_"
    )
)
def process_data(table_name: str, date: str):
    # This will generate names like:
    # ANALYTICS_table_name_users_date_20230815_20230815
    passMarvin
08/06/2025, 7:18 PM4. *Error Handling*: You might want to add some error handling:
```python
from prefect.logging import get_run_logger
def generate_flow_run_name(
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-",
    prefix: Optional[str] = None,
) -> Callable[[Optional[Dict[str, Any]]], str]:
    def generate_name(parameters: Optional[Dict[str, Any]] = None) -> str:
        logger = get_run_logger()
        try:
            components: list[str] = []
            name_prefix = prefix or flow_run.flow_name
            components.append(name_prefix)
            if parameters:
                for key in sorted(parameters.keys()):
                    try:
                        value = str(parameters[key]).replace(" ", "").replace(separator, "_")
                        components.append(f"{key}_{value}")
                    except Exception as e:
                        logger.warning(f"Failed to format parameter {key}: {e}")
            components.append(datetime.datetime.now(datetime.UTC).strftime(date_format))
            return separator.join(components)
        except Exception as e:
            logger.error(f"Error generating flow run name: {e}")
            # Fallback to a simple name
            return f"{flow_run.flow_name}-{datetime.datetime.now(datetime.UTC).strftime(date_format)}"
    return generate_nameBen T
08/06/2025, 7:21 PMdef generate_flow_run_name(
        date_format: str = "%Y%m%d-%H%M%S",
        separator: str = "-",
        prefix: Optional[str] = None
) -> Callable[[Optional[dict[str, Any]]], str]:
    """
    Creates a flow run name generator for Prefect flows.
    The generated function builds a name by combining:
      - A prefix (either provided or from `flow_run.flow_name`)
      - Any provided parameters as `key_value` pairs sorted by key
      - A UTC timestamp in the given format
    :param date_format: A `strftime`-compatible format string for the UTC timestamp. Defaults to "%Y%m%d-%H%M%S".
    :param separator: String used to join the name components. Defaults to "-".
    :param prefix: Optional static prefix for the flow run name. If None, the flow name (`flow_run.flow_name`) will be used.
    :returns: A callable that accepts an optional parameters dict and returns the generated flow run name as a string.
    """
    def generate_name(
            parameters: Optional[dict[str, Any]] = None
    ) -> str:
        """
        Generate a Prefect flow run name.
        
        Builds a name by combining:
          - A prefix (either provided to the factory or from `flow_run.flow_name`)
          - Any provided parameters as `key_value` pairs sorted by key
          - A UTC timestamp in the configured format
        
        :param parameters: Optional dictionary of flow parameters to include in the name.
                           Keys are sorted alphabetically; values are converted to strings
                           with spaces removed and separators replaced with underscores.
        :returns: The generated flow run name as a string.
        """
        components: list[str] = []
        # Use explicit prefix if provided, otherwise the flow's name
        name_prefix = prefix or flow_run.flow_name
        components.append(name_prefix)
        # Add parameters in a stable order
        if parameters:
            for key in sorted(parameters.keys()):
                # Safe string conversion
                value = str(parameters[key]).replace(" ", "").replace(separator, "_")
                components.append(f"{key}_{value}")
        # Always add timestamp (UTC-aware)
        components.append(datetime.datetime.now(datetime.UTC).strftime(date_format))
        return separator.join(components)
    return generate_nameMarvin
08/06/2025, 7:21 PMMarvin
08/06/2025, 7:21 PMpython
from prefect.logging import get_run_logger
def generate_name(parameters: Optional[dict[str, Any]] = None) -> str:
    logger = get_run_logger()
    try:
        # ... your existing code ...
    except Exception as e:
        logger.warning(f"Error generating flow run name: {e}")
        # Fallback to basic name
        return f"{flow_run.flow_name}-{datetime.datetime.now(datetime.UTC).strftime(date_format)}"python
def generate_flow_run_name(
    date_format: str = "%Y%m%d-%H%M%S",
    separator: str = "-",
    prefix: Optional[str] = None,
    include_params: Optional[list[str]] = None,  # New optional parameter
) -> Callable[[Optional[dict[str, Any]]], str]:python
@flow(
    name="analytics-pipeline",
    flow_run_name=generate_flow_run_name(
        prefix="ANALYTICS",
        date_format="%Y%m%d",
        separator="_"
    )
)
def process_data(table_name: str, date: str):
    # Will generate names like:
    # ANALYTICS_table_name_users_date_20230815_20230815
    passBen T
08/06/2025, 7:24 PMMarvin
08/06/2025, 7:24 PMMarvin
08/06/2025, 7:27 PMMarvin
08/06/2025, 7:28 PMprefect.utilities.names.generate_slug()python
from prefect.utilities.names import generate_slug
# Generate a random name (like "jumping-rabbit")
random_name = generate_slug(2)  # 2 words
# Use it in a flow run name generator
from prefect import flow
from datetime import datetime
from typing import Optional, Dict, Any
def generate_flow_run_name(
    include_random_name: bool = True,
    separator: str = "-"
) -> Callable[[Optional[Dict[str, Any]]], str]:
    def generate_name(parameters: Optional[Dict[str, Any]] = None) -> str:
        components = []
        
        # Add base flow name
        components.append(flow_run.flow_name)
        
        # Add random animal-style name if requested
        if include_random_name:
            components.append(generate_slug(2))
            
        # Add timestamp
        components.append(datetime.datetime.now(datetime.UTC).strftime("%Y%m%d-%H%M%S"))
        
        return separator.join(components)
    
    return generate_name
@flow(
    name="analytics-pipeline",
    flow_run_name=generate_flow_run_name(include_random_name=True)
)
def process_data():
    # This will generate names like:
    # analytics-pipeline-jumping-rabbit-20230815-123456
    passpython
from prefect.utilities.names import generate_slug
# Generate a few example names
for _ in range(5):
    print(generate_slug(2))
# Might output something like:
# jumping-rabbit
# dancing-penguin
# flying-elephant
# happy-tiger
# running-giraffegenerate_slug()Ben T
08/06/2025, 7:32 PMMarvin
08/06/2025, 7:32 PMMarvin
08/06/2025, 7:33 PMgenerate_slugflow_run_namegenerate_slugpython
from prefect import flow
# Default behavior - will use flow name + timestamp
@flow(name="my-analytics")
def process_data():
    pass
# Results in names like: "my-analytics-20230815-123456"
# If you want the animal-style names, you'd need to explicitly use generate_slug
from prefect.utilities.names import generate_slug
@flow(
    name="my-analytics",
    flow_run_name=lambda: f"{generate_slug(2)}-{datetime.datetime.now(datetime.UTC).strftime('%Y%m%d-%H%M%S')}"
)
def process_data():
    pass
# Results in names like: "happy-penguin-20230815-123456"namegenerate_flow_run_namegenerate_sluggenerate_slugBen T
08/06/2025, 7:36 PMMarvin
08/06/2025, 7:36 PMMarvin
08/06/2025, 7:37 PMget_run_logger()get_run_logger()MissingContextErrorpython
from prefect import flow, task
from prefect.logging import get_run_logger
# ❌ Don't do this:
logger = get_run_logger()  # This will fail or not work as expected
@flow
def my_flow(logger):  # Don't pass logger as parameter
    <http://logger.info|logger.info>("Hello")  # Logs won't be properly associated with the flow run
# ✅ Do this instead:
@flow
def my_flow():
    logger = get_run_logger()  # Initialize inside the flow
    <http://logger.info|logger.info>("Hello")  # Properly associated with flow run
# ✅ For tasks:
@task
def my_task():
    logger = get_run_logger()  # Initialize inside the task
    <http://logger.info|logger.info>("Task running")
# ✅ If you need logging outside of flows/tasks:
from prefect.logging import get_logger
setup_logger = get_logger("setup")  # Use get_logger instead of get_run_logger
<http://setup_logger.info|setup_logger.info>("Setting up...")python
from prefect import flow, task
from prefect.logging import get_run_logger
from typing import Any
def log_step(step_name: str, data: Any = None) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Step: {step_name}", extra={"data": data})
@flow
def my_flow():
    log_step("Starting flow")
    result = process_data()
    log_step("Data processed", result)
@task
def process_data():
    log_step("Processing data")
    return {"status": "complete"}python
from prefect import flow, task
from prefect.logging import get_run_logger
from dataclasses import dataclass
from typing import Optional
@dataclass
class LogContext:
    step: str
    details: Optional[dict] = None
    
def log_event(context: LogContext, level: str = "INFO") -> None:
    logger = get_run_logger()
    log_func = getattr(logger, level.lower())
    log_func(f"Step: {context.step}", extra={"details": context.details})
@flow
def my_flow():
    log_event(LogContext("flow_start"))
    result = process_data()
    log_event(LogContext("flow_end", {"result": result}))
@task
def process_data():
    log_event(LogContext("processing_data"))
    return {"status": "complete"}get_run_logger()get_logger()Ben T
08/07/2025, 4:00 AMMarvin
08/07/2025, 4:00 AMMarvin
08/07/2025, 4:01 AMpython
from prefect.blocks.notifications import SlackWebhook
slack_block = SlackWebhook(
    url="<https://hooks.slack.com/services/xxx/yyy/zzz>"
)
slack_block.save(name="basic-slack-webhook")prefect-slackpython
# First install: pip install prefect-slack
from prefect_slack import SlackIncomingWebhook
slack_block = SlackIncomingWebhook(
    url="<https://hooks.slack.com/services/xxx/yyy/zzz>"
)
slack_block.save(name="advanced-slack-webhook")prefect-slackpip install prefect-slackSlackIncomingWebhookbash
   prefect block register -m prefect_slackpython
# Basic Webhook
from prefect.blocks.notifications import SlackWebhook
slack = SlackWebhook.load("basic-slack-webhook")
slack.notify("Simple message")
# Incoming Webhook (more features)
from prefect_slack import SlackIncomingWebhook
slack = SlackIncomingWebhook.load("advanced-slack-webhook")
# Supports more Slack-specific features
await slack.notify_on_failure(flow_run)
await slack.notify_on_completion(flow_run)SlackIncomingWebhookSlackWebhookSlackWebhookSlackIncomingWebhookbash
# When using SlackIncomingWebhook, always register the blocks
prefect block register -m prefect_slack
# Verify registration
prefect block type lsBen T
08/07/2025, 4:12 AMMarvin
08/07/2025, 4:12 AMMarvin
08/07/2025, 4:13 AMprefect.runtimepython
from prefect import flow
from prefect.runtime import flow_run
@flow
def my_flow():
    # Flow run details
    print(f"Flow Run ID: {flow_run.id}")
    print(f"Flow Name: {flow_run.flow_name}")
    print(f"Flow Run Name: {flow_run.name}")
    print(f"Parameters: {flow_run.parameters}")
    print(f"Tags: {flow_run.tags}")
    print(f"UI URL: {flow_run.ui_url}")
    
    # Parent/Root flow information
    print(f"Parent Flow Run ID: {flow_run.parent_flow_run_id}")
    print(f"Root Flow Run ID: {flow_run.root_flow_run_id}")
    
    # Deployment information
    print(f"Parent Deployment ID: {flow_run.parent_deployment_id}")
    
    # Timing information
    print(f"Scheduled Start: {flow_run.scheduled_start_time}")
    
    # Run metadata
    print(f"Run Count: {flow_run.run_count}")
    print(f"Flow Version: {flow_run.flow_version}")python
from prefect import task
from prefect.runtime import task_run
@task
def my_task():
    print(f"Task Run ID: {task_run.id}")
    print(f"Task Name: {task_run.task_name}")
    print(f"Task Run Name: {task_run.name}")
    print(f"Parameters: {task_run.parameters}")
    print(f"Tags: {task_run.tags}")
    print(f"UI URL: {task_run.ui_url}")
    print(f"Run Count: {task_run.run_count}")python
from prefect import flow
from prefect.runtime import deployment
@flow
def my_flow():
    if deployment.id:  # Only available when run from deployment
        print(f"Deployment ID: {deployment.id}")
        print(f"Deployment Name: {deployment.name}")
        print(f"Deployment Version: {deployment.version}")
        print(f"Deployment Parameters: {deployment.parameters}")python
from prefect import flow, task
from prefect.runtime import flow_run, task_run
from datetime import datetime
def get_context_info():
    """Helper to get relevant context info"""
    context = {
        "timestamp": datetime.utcnow().isoformat(),
        "flow_name": flow_run.flow_name,
        "flow_run_id": flow_run.id,
        "flow_run_name": flow_run.name,
    }
    
    # Add task info if in task context
    try:
        context.update({
            "task_name": task_run.task_name,
            "task_run_id": task_run.id
        })
    except RuntimeError:
        pass  # Not in task context
        
    # Add deployment info if available
    if deployment.id:
        context.update({
            "deployment_name": deployment.name,
            "deployment_id": deployment.id
        })
        
    return context
@task
def process_data():
    context = get_context_info()
    print(f"Processing with context: {context}")
    # Your task logic here
@flow
def my_flow():
    context = get_context_info()
    print(f"Flow started with context: {context}")
    process_data()python
from prefect import flow
from prefect.runtime import flow_run
from prefect.logging import get_run_logger
@flow
def my_flow():
    logger = get_run_logger()
    try:
        # Your flow logic here
        raise ValueError("Something went wrong")
    except Exception as e:
        logger.error(
            f"Error in flow run {flow_run.name} ({flow_run.id})",
            extra={
                "flow_run_url": flow_run.ui_url,
                "parameters": flow_run.parameters,
                "error": str(e)
            }
        )
        raiseBen T
08/07/2025, 4:14 AMtry:
        # Test the block directly
        slack_webhook = await SlackWebhook.load("slack-notifications")
        await slack_webhook.notify("Test message")
        logger.debug("✅ Test Slack notification sent successfully")
    except Exception as e:
        logger.exception(f"❌ Failed to send test Slack notification: {e}")Marvin
08/07/2025, 4:14 AMMarvin
08/07/2025, 4:14 AMUnable to find block document named slack-notifications for block type slack-webhookpython
from prefect.blocks.notifications import SlackWebhook
# When you saved the block, it should have been like this:
slack_webhook = SlackWebhook(
    url="your-webhook-url"
)
slack_webhook.save(name="slack-notifications")  # This is the name you're trying to loadpython
# You can run this in a Python shell or script
from prefect.blocks.notifications import SlackWebhook
# List all saved blocks of this type
blocks = SlackWebhook.load_all()
for block in blocks:
    print(f"Found block: {block.name}")python
from prefect.blocks.notifications import SlackWebhook
from prefect.logging import get_run_logger
logger = get_run_logger()
try:
    # Make sure the block name matches EXACTLY what you saved it as
    slack_webhook = await SlackWebhook.load("slack-notifications")
    await slack_webhook.notify("Test message")
    logger.debug("✅ Test Slack notification sent successfully")
except Exception as e:
    logger.exception(f"❌ Failed to send test Slack notification: {e}")python
from prefect.blocks.core import Block
# List all blocks
for block in Block.list_blocks():
    print(block)bash
prefect block lspython
from prefect.blocks.notifications import SlackWebhook
# Create and save the block
slack_webhook = SlackWebhook(
    url="your-webhook-url"
)
slack_webhook.save(name="slack-notifications", overwrite=True)  # overwrite=True if you want to update existing
# Then try loading and using it
loaded_webhook = await SlackWebhook.load("slack-notifications")
await loaded_webhook.notify("Test message")prefect block lsBen T
08/07/2025, 4:15 AMfrom prefect.blocks.notifications import SlackWebhookMarvin
08/07/2025, 4:15 AMMarvin
08/07/2025, 4:16 AMpython
from prefect.blocks.notifications import SlackWebhook
from prefect.blocks.core import Block
from prefect.logging import get_run_logger
logger = get_run_logger()
# First, let's check what blocks are available
all_blocks = Block.list_blocks()
<http://logger.info|logger.info>(f"Available blocks: {all_blocks}")
# Then check specifically for slack webhooks
slack_blocks = SlackWebhook.load_all()
<http://logger.info|logger.info>(f"Available Slack webhook blocks: {[block.name for block in slack_blocks]}")
try:
    # Load using the full block type path
    slack_webhook = await SlackWebhook.load("slack-webhook/slack-notifications")
    await slack_webhook.notify("Test message")
    logger.debug("✅ Test Slack notification sent successfully")
except Exception as e:
    logger.exception(f"❌ Failed to send test Slack notification: {e}")python
from prefect.blocks.notifications import SlackWebhook
from prefect.logging import get_run_logger
logger = get_run_logger()
# Create and save the block
slack_webhook = SlackWebhook(
    url="your-webhook-url"  # Replace with your actual webhook URL
)
# Save with explicit block type
try:
    slack_webhook.save(name="slack-notifications", overwrite=True)
    <http://logger.info|logger.info>("Block saved successfully")
    
    # Now try to load and use it
    loaded_webhook = await SlackWebhook.load("slack-notifications")
    await loaded_webhook.notify("Test message")
    logger.debug("✅ Test Slack notification sent successfully")
except Exception as e:
    logger.exception(f"❌ Error: {e}")python
from prefect.blocks.notifications import SlackWebhook
from prefect.blocks.core import Block
from prefect.logging import get_run_logger
async def diagnose_slack_block():
    logger = get_run_logger()
    
    # 1. Check all available blocks
    <http://logger.info|logger.info>("Checking all available blocks...")
    all_blocks = Block.list_blocks()
    <http://logger.info|logger.info>(f"Available blocks: {all_blocks}")
    
    # 2. Check specifically for slack blocks
    <http://logger.info|logger.info>("Checking Slack webhook blocks...")
    try:
        slack_blocks = SlackWebhook.load_all()
        <http://logger.info|logger.info>(f"Found Slack blocks: {[block.name for block in slack_blocks]}")
    except Exception as e:
        logger.error(f"Error loading Slack blocks: {e}")
    
    # 3. Try different loading approaches
    try:
        # Try with just the name
        <http://logger.info|logger.info>("Trying to load with just name...")
        block1 = await SlackWebhook.load("slack-notifications")
        <http://logger.info|logger.info>("Success!")
    except Exception as e:
        logger.error(f"Failed with name only: {e}")
    
    try:
        # Try with full path
        <http://logger.info|logger.info>("Trying to load with full path...")
        block2 = await SlackWebhook.load("slack-webhook/slack-notifications")
        <http://logger.info|logger.info>("Success!")
    except Exception as e:
        logger.error(f"Failed with full path: {e}")
# Use this in your flow
@flow
async def test_flow():
    await diagnose_slack_block()Ben T
08/07/2025, 4:22 AMfrom prefect_slack import SlackWebhook
slack_webhook_url = os.environ["SLACK_WEBHOOK_URL"]
block_name = "slack-notifications"
def create_slack_webhook_block():
    block = SlackWebhook(
        url=SecretStr(slack_webhook_url)
    )
    block.save(block_name, overwrite=True)
    <http://logger.info|logger.info>(f"✅ Saved SlackWebhook notification block as '{block_name}'")Marvin
08/07/2025, 4:22 AMMarvin
08/07/2025, 4:22 AMprefect_slackprefect.blocks.notificationspython
from prefect_slack import SlackWebhook  # Note: same import as creation
from prefect.logging import get_run_logger
logger = get_run_logger()
try:
    # Load using the same SlackWebhook class from prefect_slack
    slack_webhook = await SlackWebhook.load("slack-notifications")
    await slack_webhook.notify("Test message")
    logger.debug("✅ Test Slack notification sent successfully")
except Exception as e:
    logger.exception(f"❌ Failed to send test Slack notification: {e}")from prefect.blocks.notifications import SlackWebhookfrom prefect_slack import SlackWebhookprefect_slackSlackWebhookprefect.blocks.notifications.SlackWebhookprefect-slackbash
pip install prefect-slackbash
prefect block register -m prefect_slackpython
# Both creation and usage should use the same import:
from prefect_slack import SlackWebhookprefect_slackBen T
08/07/2025, 4:23 AMMarvin
08/07/2025, 4:23 AMMarvin
08/07/2025, 4:24 AMprefect_slack.SlackWebhookprefect.blocks.notifications.SlackWebhookpython
# How you CREATED the block:
from prefect_slack import SlackWebhook  # ✅ Created with this implementation
block = SlackWebhook(url=SecretStr(slack_webhook_url))
block.save("slack-notifications")
# How you tried to LOAD the block:
from prefect.blocks.notifications import SlackWebhook  # ❌ Different implementation
await SlackWebhook.load("slack-notifications")  # Failed because wrong implementation
# How it SHOULD be:
from prefect_slack import SlackWebhook  # ✅ Same implementation as creation
await SlackWebhook.load("slack-notifications")  # Works!Ben T
08/07/2025, 4:25 AMMarvin
08/07/2025, 4:25 AMMarvin
08/07/2025, 4:26 AMbash
# Make sure the Slack blocks are registered
prefect block register -m prefect_slackpython
from prefect.blocks.core import Block
# List all registered block types to verify
blocks = Block.list_blocks()
print([b for b in blocks if "slack" in b.lower()])prefect-slackpython
from prefect_slack import SlackWebhook
# 1. Try recreating the block with a different name
slack_webhook = SlackWebhook(
    url=SecretStr(slack_webhook_url)
)
slack_webhook.save("slack-notifications-new", overwrite=True)
# 2. Update your automation to use the new blockBen T
08/07/2025, 4:27 AMMarvin
08/07/2025, 4:27 AMMarvin
08/07/2025, 4:27 AMYour Flow Code                  Prefect Server
-------------                   --------------
@flow                          Automation
def my_flow():                 - Trigger (e.g., "flow run completed")
    # No Slack imports         - Action (uses saved Slack block)
    # needed here!            
    pass                       ↓
                              Sends Slack notificationBen T
08/07/2025, 4:44 AMMarvin
08/07/2025, 4:44 AMMarvin
08/07/2025, 4:45 AMprefect-slackBen T
08/07/2025, 4:45 AM{
  "id": "068942ee-8b45-79e0-8000-091a20f4cd3d",
  "event": "prefect.automation.action.failed",
  "occurred": "2025-08-07T04:43:20.704Z",
  "payload": {
    "reason": "The notification block was invalid: UnknownBlockType(\"No block class found for slug 'slack-incoming-webhook'. Please ensure that the block class is available in the current environment.\")",
    "invocation": "068942ee-8990-760f-8000-05a59512b651",
    "action_type": "send-notification",
    "action_index": 0
  },
  "received": "2025-08-07T04:43:20.704Z",
  "related": [],
  "resource": {
    "prefect.posture": "Reactive",
    "prefect.resource.id": "prefect.automation.96eaa544-0481-42f1-b493-9b9ac95745d5",
    "prefect.trigger-type": "event",
    "prefect.resource.name": "Flow run slack alerts"
  }
}Marvin
08/07/2025, 4:45 AMMarvin
08/07/2025, 4:45 AM"No block class found for slug 'slack-incoming-webhook'. Please ensure that the block class is available in the current environment."prefect-slackbash
prefect block register -m prefect_slackbash
prefect block type lsslack-incoming-webhookprefect-slackprefect-slackBen T
08/07/2025, 4:48 AMMarvin
08/07/2025, 4:48 AMMarvin
08/07/2025, 4:48 AMUnknownBlockTypeprefect_slack.SlackWebhookpython
from prefect_slack import SlackWebhook
block = SlackWebhook.load("slack-notifications")
print(f"Block type: {block._block_type_slug}")  # This will show the actual block type slugBen T
08/07/2025, 4:49 AMblock_type_nameSlack Incoming Webhookslack-notifications,"""***""",c7f4a527-f22d-477f-9bd9-b17198ff8eaf,7ecde64e-35ef-41e3-b369-d1e1992eb131,false,Slack Incoming WebhookMarvin
08/07/2025, 4:49 AMMarvin
08/07/2025, 4:49 AMprefect_slack.SlackWebhookprefect-slackbash
pip show prefect-slackpython
from prefect_slack.webhooks import SlackWebhook  # Note the .webhooks import
# or
from prefect_slack import SlackIncomingWebhook  # Depending on your version
slack_webhook = SlackIncomingWebhook(  # Note: IncomingWebhook, not just Webhook
    url=SecretStr(slack_webhook_url)
)
slack_webhook.save("slack-notifications", overwrite=True)pip show prefect-slackSlackWebhookSlackIncomingWebhookBen T
08/07/2025, 4:52 AMMarvin
08/07/2025, 4:52 AMMarvin
08/07/2025, 4:53 AMpython
from prefect.blocks.notifications import SlackWebhook  # Note: different import
slack_webhook = SlackWebhook(
    url=SecretStr(slack_webhook_url)
)
slack_webhook.save("slack-notifications-core", overwrite=True)UnknownBlockTypeSlackWebhookBen T
08/07/2025, 4:53 AMMarvin
08/07/2025, 4:54 AMMarvin
08/07/2025, 4:54 AMNotificationBlockSend a notificationBen T
08/07/2025, 4:56 AMMarvin
08/07/2025, 4:56 AMMarvin
08/07/2025, 4:57 AMpython
# These can coexist because they're different types
from prefect.blocks.notifications import SlackWebhook
from prefect_slack import SlackWebhook  # Different implementation
# Both can use the same name
slack1 = SlackWebhook(url="...")
slack1.save("my-notifications")  # Type: slack-webhook
slack2 = SlackWebhook(url="...")
slack2.save("my-notifications")  # Type: slack-incoming-webhook
# When loading, you specify both type and name
from prefect.blocks.core import Block
# These load different blocks despite same name
block1 = Block.load("slack-webhook/my-notifications")
block2 = Block.load("slack-incoming-webhook/my-notifications")slack-notificationsSlack Incoming WebhookSlack Webhook