https://prefect.io logo
s

Slackbot

07/28/2022, 12:49 AM
This message was deleted.
c

Chu

07/28/2022, 12:51 AM
code:
Copy code
@task(name = 'add')
def func_1(param_1, param_2):
    return param_1 + param_2

@task(name = 'minus')
def func_2(param_1, param_2):
    return param_2 - param_1

with Flow('flow', executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
    param_1 = Parameter("param_1")
    param_2 = Parameter("param_2")
    task_1 = func_1.map(
        param_1 = param_1, 
        param_2 = unmapped(param_2)
        )
    task_2 = func_2.map(
        param_1 = param_1, 
        param_2 = unmapped(param_2),
        task_args=unmapped(dict(trigger=all_finished))
        )

    task_2.set_upstream(task_1, mapped=True)
   

if __name__ == "__main__":
    flow.run(parameters = dict(param_1 = [1,2,3,4], param_2 = 5 ))
And I have this error:
Copy code
File "/Users/acsjhba/opt/anaconda3/envs/py39/lib/python3.9/site-packages/prefect/core/task.py", line 810, in map
    task_args = task_args.copy() if task_args else {}
AttributeError: 'unmapped' object has no attribute 'copy'
this behaviour stops me from setting trigger between upstream and downstream mapped tasks…. How can it be solved, thanks!
I know for single task, I can set
@task(name = 'minus', trigger = all_finished)
in the task decorator, but how can I do that in flow of flows, specifically for create_flow_run
a

Anna Geller

07/28/2022, 12:15 PM
is it solved now? you deleted the original message
✖️ 1
c

Chu

07/28/2022, 12:19 PM
nope, it is not solved.. but I have an idea, instead of specifying the task_args at create_flow_run level, we specify it in its child tasks level (in that task decorator), thus in create_flow_run, do you think that will work? And also, another problem is upstream_tasks = [xxx] cannot be wrapped in unmapped function.. So I may need to set something like
downtstream_flow_id.set_upstream(upstream_flow, mapped=True)
, correct?
a

Anna Geller

07/28/2022, 12:23 PM
can you perhaps try solving it in 2.0? given 2.0 is GA now, not sure it makes sense to dive as deep now
it may be easier to implement in 2.0, we added mapping in the release yesterday, too
c

Chu

07/28/2022, 12:33 PM
I’m so sorry, currently our company still using 1.0… shortly we don’t see any plan to switch
does it mean Prefect 1.0 will no longer be supported as well as questions 😭?? We purchase the 1.0 cloud
a

Anna Geller

07/28/2022, 2:07 PM
we always try to help, but just not sure if it's worth digging deep into building new solutions with 1.0. Both Prefect 1.0 and Cloud 1.0 are now in a maintenance mode and there will be no new features, so I'd encourage you to reconsider it with the migration your use case is way easier to tackle in 2.0 - perhaps you can sign up for a free version and build only this project in 2.0 at first? this way you can avoid risky big bang migration and can slowly start migrating over starting with this project
c

Chu

07/28/2022, 2:41 PM
@Mansour Zayer
6 Views