https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Dekel R

03/28/2022, 2:44 PM
Hey, I have a complex data extraction flow with 2 main cases. Each case has its own tasks and some tasks are relevant to both of them. The layout of the flow looks somewhat like this -
Copy code
with Flow('html_data_extraction__dev',
          storage=Docker(registry_url="us-central1-docker.pkg.dev/***/",
                         dockerfile="./Dockerfile"),
          schedule=daily_schedule, executor=LocalDaskExecutor(scheduler="processes")) as flow:
     mode = Parameter(name='mode', default=None)
     with case(mode, None):
          data_a=task_a
     with case(mode, 'onboard'):
          data_b=task_b
     
     data_c=merge(data_a, data_b)
     task_c
     task_d
      
     with case(mode, None):
          task_x
     with case(mode, 'onboard'):
          task_y
Tasks a and b retrieve some data (each one is relevant for a different data source), task c and d are common (not in a “case”) - doing X on the data (the data looks the same at this point) and then again tasks x and y are different - each one is relevant for a different case. When running locally (mac, flow,run…) it all works as expected. When running on Prefect cloud - all of the tasks gets skipped (exactly the same code and credentials) Any idea on what I’m missing here? I’m using “upstream_tasks” in order to run the tasks in a specific order when necessary. Thanks
k

Kevin Kho

03/28/2022, 2:51 PM
I looked at the code but I can’t immediately tell. It would seem for value of
mode
might not be either of those? You are saying
task_a
and
task_b
are skipped here right? I feel at the very least one of those should work
d

Dekel R

03/28/2022, 3:05 PM
The odd thing here is that locally it works. So the value of mode is definitely one of those - see this log from running on prefect’s cloud -
Copy code
Task 'case(onboarding)': Finished task run for task with final state: 'Success'
But after this everything is skipped. Exactly the same ‘onboarding’ mode works when running locally with this config -
Copy code
flow.run(parameters=dict(mode='onboarding')
k

Kevin Kho

03/28/2022, 3:16 PM
Could you show me more complete logs on the task execution? or the flow run schematic?
d

Dekel R

03/28/2022, 3:23 PM
Sure, sending a pm with it (if thats ok)
k

Kevin Kho

03/28/2022, 3:25 PM
Yep
3 Views