Marc Lipoff
01/26/2021, 3:27 PMemre
01/26/2021, 3:41 PMupstream_tasks
. Any upstream task that shouldn’t provide a value for our task should be passed to this kwarg in a list.
mytask = MyTask(name="tsk")(
upstream_task_with_data,
upstream_tasks=[
upstream_task_without_data
])
Marc Lipoff
01/26/2021, 3:42 PMemre
01/26/2021, 3:42 PMMarc Lipoff
01/26/2021, 3:44 PMstart_date_hour, end_date_hour = start_end_date_hour(lookback_hours, start_date_hour_input, end_date_hour_input)
timestamp_list = generate_list_of_timestamp_ranges(
start_date_hour, end_date_hour, chunks_per_hour
)
query = get_query.map(timestamp_list, unmapped(table_name))
df = big_query_extract.map(
query=query,
project=unmapped(gcp_project),
credentials=unmapped(gcp_credentials),
to_dataframe=unmapped(True),
)
success = truncate_existing_data(s3_path=s3_write_path, timestamp_list=timestamp_list, upstream_tasks=[big_query_extract])
write_to_s3.map(df, s3_path=unmapped(s3_write_path), truncate_successful=unmapped(success))
and error:
Traceback (most recent call last):
File "google_analytics_import.py", line 218, in <module>
FlowManager(flow, flow_params).do(entry_point())
File "/Users/computerai/covid19lab/data_flows/utils/deployment.py", line 286, in do
_ = runner.run(return_tasks=self.flow.tasks)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 270, in run
raise exc
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 252, in run
state = self.get_flow_run_state(
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
return runner_method(self, *args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
raise exc
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 593, in get_flow_run_state
task_states[task] = executor.submit(
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/executors/local.py", line 28, in submit
return fn(*args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 745, in run_task
return task_runner.run(
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 400, in wrapper
return func(*args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 320, in run
raise exc
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 291, in run
state = self.get_task_run_state(state, inputs=task_inputs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
return runner_method(self, *args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
raise exc
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
return run_method(self, *args, **kwargs)
File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/tasks/gcp/bigquery.py", line 136, in run
raise ValueError("No query provided.")
ValueError: No query provided.
emre
01/26/2021, 3:47 PMupstream_tasks=[df]
. You need to set the relationship as if you are establishing a data dependency, just instead of a normal argument put it in a list and give the list to upstream_tasks.
So use df instead of big_query_extractMarc Lipoff
01/26/2021, 3:48 PMtruncate_existing_data
is not being mapped. If I do that, it will reduce all "df" before preforming truncate_existing_data
emre
01/26/2021, 3:55 PMtruncate_existing_data
will wait for all df to be ready, write_to_s3
can still map over each df individually.
You can use the reduced results in one place and still use unreduced results to map over them. try it outMarc Lipoff
01/26/2021, 3:55 PM