Hey there,
I have a package I've developed that contains a module which exposes most of the functionality of the package wrapped as FunctionTasks. Its not clear to me how I can add retries to a task that was not defined with the
@task
decorator when mapping:
import culvert.tasks as ct
with Flow("some flow") as f:
<some other task>
copy_batches_local = ct.copy_batch_local.map(
src_conn_str=unmapped(src_conn_str),
batch_range=create_batches,
src_table=unmapped(src_table),
split_by=unmapped(split_by),
data_path=unmapped(data_path),
task_args={"max_retries": 3},
)
TypeError: got an unexpected keyword argument 'task_args'
Also tried:
copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
src_conn_str=unmapped(src_conn_str),
batch_range=create_batches,
src_table=unmapped(src_table),
split_by=unmapped(split_by),
data_path=unmapped(data_path),
)
TypeError: missing a required argument: 'src_conn_str'
Is there a way to partially bind my ct.copy_batch_local task?