Thread
#prefect-community
    Marc Lipoff

    Marc Lipoff

    1 year ago
    Using the functional api, how can i set a dependency when the outputs of a task dont depend on another. for example, i have 3 tasks: read, delete write. I want to read some data, then delete the old stuff, and then right the new stuff. "deleting" does not take any inputs; but we want to wait until "reading" is done before we do it.
    emre

    emre

    1 year ago
    After you initialize your task, you should pass these ‘non-data’ dependencies as the kwarg
    upstream_tasks
    . Any upstream task that shouldn’t provide a value for our task should be passed to this kwarg in a list.
    mytask = MyTask(name="tsk")(
    upstream_task_with_data, 
    upstream_tasks=[
    upstream_task_without_data
    ])
    Marc Lipoff

    Marc Lipoff

    1 year ago
    Will that show up in the flow.visualize()
    emre

    emre

    1 year ago
    yeah, it wont have any labels on the edge.
    since, you know, there isn’t a param being passed
    Marc Lipoff

    Marc Lipoff

    1 year ago
    I get an error
    Here is the code:
    start_date_hour, end_date_hour = start_end_date_hour(lookback_hours, start_date_hour_input, end_date_hour_input)
    
        timestamp_list = generate_list_of_timestamp_ranges(
            start_date_hour, end_date_hour, chunks_per_hour
        )
    
        query = get_query.map(timestamp_list, unmapped(table_name))
    
        df = big_query_extract.map(
            query=query,
            project=unmapped(gcp_project),
            credentials=unmapped(gcp_credentials),
            to_dataframe=unmapped(True),
        )
    
        success = truncate_existing_data(s3_path=s3_write_path, timestamp_list=timestamp_list, upstream_tasks=[big_query_extract])
        write_to_s3.map(df, s3_path=unmapped(s3_write_path), truncate_successful=unmapped(success))
    and error:
    Traceback (most recent call last):
      File "google_analytics_import.py", line 218, in <module>
        FlowManager(flow, flow_params).do(entry_point())
      File "/Users/computerai/covid19lab/data_flows/utils/deployment.py", line 286, in do
        _ = runner.run(return_tasks=self.flow.tasks)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 270, in run
        raise exc
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 252, in run
        state = self.get_flow_run_state(
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
        return runner_method(self, *args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
        raise exc
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 593, in get_flow_run_state
        task_states[task] = executor.submit(
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/executors/local.py", line 28, in submit
        return fn(*args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 745, in run_task
        return task_runner.run(
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 400, in wrapper
        return func(*args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 320, in run
        raise exc
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 291, in run
        state = self.get_task_run_state(state, inputs=task_inputs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 70, in inner
        return runner_method(self, *args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 68, in inner
        raise exc
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
        return run_method(self, *args, **kwargs)
      File "/Users/computerai/miniconda3/lib/python3.8/site-packages/prefect/tasks/gcp/bigquery.py", line 136, in run
        raise ValueError("No query provided.")
    ValueError: No query provided.
    emre

    emre

    1 year ago
    try
    upstream_tasks=[df]
    . You need to set the relationship as if you are establishing a data dependency, just instead of a normal argument put it in a list and give the list to upstream_tasks. So use df instead of big_query_extract
    Marc Lipoff

    Marc Lipoff

    1 year ago
    truncate_existing_data
    is not being mapped. If I do that, it will reduce all "df" before preforming
    truncate_existing_data
    right. i want to wait until they are all done. but write_to_s3is mapped, and i dont want to reduce all "df", as I want write_to_s3to be mapped
    emre

    emre

    1 year ago
    Oh ok, thats not an issue.
    truncate_existing_data
    will wait for all df to be ready,
    write_to_s3
    can still map over each df individually. You can use the reduced results in one place and still use unreduced results to map over them. try it out
    Marc Lipoff

    Marc Lipoff

    1 year ago
    ok cool!