Is there a way to dynamically name mapped tasks? I...
# ask-community
z
Is there a way to dynamically name mapped tasks? I found this thread: https://linen.prefect.io/t/48228/hello-in-prefect-2-0-is-there-a-way-to-provide-the-task-name , but it caters to a use-case where the task is run in a for loop (not mapped). I can't run
task.with_options().map()
For example I have:
Copy code
@task(
    name=f'{dev_stage} - Airbyte - Task'
)
def run_airbyte_sync(
    connection: AirbyteConnection, 
    connection_name: str
):
    print(f"kicking off task for connection name: '{connection_name}'")
    job_run = connection.trigger()
    job_run.wait_for_completion()
    return job_run.fetch_result()

@flow(
	name=f'{dev_stage} - Airbyte',
	log_prints=True,
	task_runner=DaskTaskRunner( #allows for parallel execution
		cluster_kwargs={
            "processes": False, # use threads instead of processes
            "n_workers": 8,
            "threads_per_worker": 1 # number of threads per worker
        }
    )
)

def flow_airbyte():
    
	#run airbyte sync
    airbyte_results = run_airbyte_sync.map(
        airbyte_connections,
        airbyte_connections_dict.values()
    )
Any help is appreciated! it would be super nice to do this because I would get observability into the names of the connections I'm running in Airbyte - i.e. if one task fails, I quickly know which one it was in airbyte
c
Hey Zachary! Tasks now have a
task_run_name
argument that allows you to provide a name template for each call of the task, and that allows for templating based on task inputs. So for example, I believe this should do what you expect:
Copy code
@task(task_run_name="Airbyte {connection_name}")
notice that I did not use an f-string but instead kept the template variable - Prefect will template it for you at runtime.
thank you 1
z
This is great! for some reason, my flow crashed with this message though:
RecursionError: maximum recursion depth exceeded while calling a Python object
If I'm using prefect-dask, could that have anything to do with it? Once I removed this functionality, the flow has been working fine
c
could you share your task config and definition? There is no recursion in the formatting of the run name, but possibly Airbyte's primitives have recursion issues when string formatting them? You could try
Copy code
str(connection_name) 
repr(connection_name)
to try and reproduce independently