https://prefect.io logo
Title
j

Jason Damiani

08/22/2019, 3:42 PM
Hey there, I have a package I've developed that contains a module which exposes most of the functionality of the package wrapped as FunctionTasks. Its not clear to me how I can add retries to a task that was not defined with the
@task
decorator when mapping:
import culvert.tasks as ct

with Flow("some flow") as f:
   <some other task>
   copy_batches_local = ct.copy_batch_local.map(
        src_conn_str=unmapped(src_conn_str),
        batch_range=create_batches,
        src_table=unmapped(src_table),
        split_by=unmapped(split_by),
        data_path=unmapped(data_path),
        task_args={"max_retries": 3},
    )
TypeError: got an unexpected keyword argument 'task_args'
Also tried:
copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
        src_conn_str=unmapped(src_conn_str),
        batch_range=create_batches,
        src_table=unmapped(src_table),
        split_by=unmapped(split_by),
        data_path=unmapped(data_path),
    )
TypeError: missing a required argument: 'src_conn_str'
Is there a way to partially bind my ct.copy_batch_local task?
j

Jeremiah

08/22/2019, 3:50 PM
Hey @Jason Damiani, great question. You’re right,
task_args
isn’t exposed for mapping for the way it is for normal calls, but it should be! I’m going to open an issue to address that asap. In the meantime, that
task_args
keyword is actually just getting passed to a task copy statement, so you should be able to recreate it like this:
ct.copy_batch_local.copy(max_retries=3).map(...)
Each task’s
copy
statement accepts
task_args
that you want to overwrite
j

Jason Damiani

08/22/2019, 3:51 PM
Awesome, thanks @Jeremiah
j

Jeremiah

08/22/2019, 3:51 PM
Will that solve the problem?
j

Jason Damiani

08/22/2019, 3:51 PM
I believe so
j

Jeremiah

08/22/2019, 3:51 PM
If it doesn’t let us know