Thread
#prefect-community
    j

    Jason Damiani

    3 years ago
    Hey there, I have a package I've developed that contains a module which exposes most of the functionality of the package wrapped as FunctionTasks. Its not clear to me how I can add retries to a task that was not defined with the
    @task
    decorator when mapping:
    import culvert.tasks as ct
    
    with Flow("some flow") as f:
       <some other task>
       copy_batches_local = ct.copy_batch_local.map(
            src_conn_str=unmapped(src_conn_str),
            batch_range=create_batches,
            src_table=unmapped(src_table),
            split_by=unmapped(split_by),
            data_path=unmapped(data_path),
            task_args={"max_retries": 3},
        )
    TypeError: got an unexpected keyword argument 'task_args'
    Also tried:
    copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
            src_conn_str=unmapped(src_conn_str),
            batch_range=create_batches,
            src_table=unmapped(src_table),
            split_by=unmapped(split_by),
            data_path=unmapped(data_path),
        )
    TypeError: missing a required argument: 'src_conn_str'
    Is there a way to partially bind my ct.copy_batch_local task?
    Jeremiah

    Jeremiah

    3 years ago
    Hey @Jason Damiani, great question. You’re right,
    task_args
    isn’t exposed for mapping for the way it is for normal calls, but it should be! I’m going to open an issue to address that asap. In the meantime, that
    task_args
    keyword is actually just getting passed to a task copy statement, so you should be able to recreate it like this:
    ct.copy_batch_local.copy(max_retries=3).map(...)
    Each task’s
    copy
    statement accepts
    task_args
    that you want to overwrite
    Will that solve the problem?
    j

    Jason Damiani

    3 years ago
    Awesome, thanks @Jeremiah
    I believe so
    Jeremiah

    Jeremiah

    3 years ago
    If it doesn’t let us know