Zachary Loertscher
05/05/2023, 7:09 PMtask.with_options().map()
For example I have:
@task(
name=f'{dev_stage} - Airbyte - Task'
)
def run_airbyte_sync(
connection: AirbyteConnection,
connection_name: str
):
print(f"kicking off task for connection name: '{connection_name}'")
job_run = connection.trigger()
job_run.wait_for_completion()
return job_run.fetch_result()
@flow(
name=f'{dev_stage} - Airbyte',
log_prints=True,
task_runner=DaskTaskRunner( #allows for parallel execution
cluster_kwargs={
"processes": False, # use threads instead of processes
"n_workers": 8,
"threads_per_worker": 1 # number of threads per worker
}
)
)
def flow_airbyte():
#run airbyte sync
airbyte_results = run_airbyte_sync.map(
airbyte_connections,
airbyte_connections_dict.values()
)
Any help is appreciated!
it would be super nice to do this because I would get observability into the names of the connections I'm running in Airbyte - i.e. if one task fails, I quickly know which one it was in airbyteChris White
task_run_name
argument that allows you to provide a name template for each call of the task, and that allows for templating based on task inputs. So for example, I believe this should do what you expect:
@task(task_run_name="Airbyte {connection_name}")
notice that I did not use an f-string but instead kept the template variable - Prefect will template it for you at runtime.Zachary Loertscher
05/09/2023, 6:38 PMRecursionError: maximum recursion depth exceeded while calling a Python object
If I'm using prefect-dask, could that have anything to do with it? Once I removed this functionality, the flow has been working fineChris White
str(connection_name)
repr(connection_name)
to try and reproduce independently