Using the functional api, how can i set a dependen...
# prefect-community
m
Using the functional api, how can i set a dependency when the outputs of a task dont depend on another. for example, i have 3 tasks: read, delete write. I want to read some data, then delete the old stuff, and then right the new stuff. "deleting" does not take any inputs; but we want to wait until "reading" is done before we do it.
🙌 1
e
After you initialize your task, you should pass these ‘non-data’ dependencies as the kwarg
upstream_tasks
. Any upstream task that shouldn’t provide a value for our task should be passed to this kwarg in a list.
Copy code
mytask = MyTask(name="tsk")(
upstream_task_with_data, 
upstream_tasks=[
upstream_task_without_data
])
m
Will that show up in the flow.visualize()
e
yeah, it wont have any labels on the edge.
since, you know, there isn’t a param being passed
m
I get an error
Here is the code:
Copy code
start_date_hour, end_date_hour = start_end_date_hour(lookback_hours, start_date_hour_input, end_date_hour_input)

    timestamp_list = generate_list_of_timestamp_ranges(
        start_date_hour, end_date_hour, chunks_per_hour
    )

    query = get_query.map(timestamp_list, unmapped(table_name))

    df = big_query_extract.map(
        query=query,
        project=unmapped(gcp_project),
        credentials=unmapped(gcp_credentials),
        to_dataframe=unmapped(True),
    )

    success = truncate_existing_data(s3_path=s3_write_path, timestamp_list=timestamp_list, upstream_tasks=[big_query_extract])
    write_to_s3.map(df, s3_path=unmapped(s3_write_path), truncate_successful=unmapped(success))
and error:
Copy code
Traceback (most recent call last):
  File "google_analytics_import.py", line 218, in <module>
    FlowManager(flow, flow_params).do(entry_point())
  File "/Users/computerai/covid19lab/data_flows/utils/deployment.py", line 286, in do
    _ = runner.run(return_tasks=self.flow.tasks)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 270, in run
    raise exc
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 252, in run
    state = self.get_flow_run_state(
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
    return runner_method(self, *args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
    raise exc
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 593, in get_flow_run_state
    task_states[task] = executor.submit(
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/executors/local.py", line 28, in submit
    return fn(*args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 745, in run_task
    return task_runner.run(
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 400, in wrapper
    return func(*args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 320, in run
    raise exc
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 291, in run
    state = self.get_task_run_state(state, inputs=task_inputs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
    return runner_method(self, *args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
    raise exc
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
    return run_method(self, *args, **kwargs)
  File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/tasks/gcp/bigquery.py", line 136, in run
    raise ValueError("No query provided.")
ValueError: No query provided.
e
try
upstream_tasks=[df]
. You need to set the relationship as if you are establishing a data dependency, just instead of a normal argument put it in a list and give the list to upstream_tasks. So use df instead of big_query_extract
m
truncate_existing_data
is not being mapped. If I do that, it will reduce all "df" before preforming
truncate_existing_data
right. i want to wait until they are all done. but `write_to_s3`is mapped, and i dont want to reduce all "df", as I want `write_to_s3`to be mapped
e
Oh ok, thats not an issue.
truncate_existing_data
will wait for all df to be ready,
write_to_s3
can still map over each df individually. You can use the reduced results in one place and still use unreduced results to map over them. try it out
m
ok cool!