Hey everyone, is there a task setting to prevent a...
# prefect-community
j
Hey everyone, is there a task setting to prevent a mapping task from running until all upstream tasks are complete?
d
It sounds like you may be having a problem, if you want to share your
flow.visualize()
and a brief description of your problem, I’d be happy to help!
j
In the task called Apply Stock Filters, half of the tasks are complete and half are failures
I updated code in the task to limit the failures but I want to be sure that GetItem and . “Get Stock Filters” are all complete before the mapping task starts
👀 1
d
Hey John, that should be what’s happening here given your visualization. Have you confirmed that
Apply Stock Filters
begins before
Get Item
finishes?
Is it possible that
s3_path
isn’t being returned correctly from
GetItem
in every case?
j
s3_path is a fixed value. that is combined with
tick_params
to get the full path
d
To be more explicit: Prefect is correctly recognizing both
Get Stock Filters
and
GetItem
as upstream dependencies of
Apply Stock Filters
, so
Apply Stock Filters
will not start until both have finished
So you have one dependency you wish to map over and another that you do not?
Are you using the
unmapped
helper function?
j
yes
Copy code
qualified_stocks = apply_ticker_filters.map(
        unmapped(return_data[0]),
        ticker_filters,
    )
d
What’s the traceback you’re seeing?
j
Copy code
February 27th 2020 at 10:25:45am | prefect.CloudTaskRunner
ERROR 
Unexpected error: ArrowIOError('Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundError')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 877, in get_task_run_state
    self.task.run, timeout=self.task.timeout, **raw_inputs
  File "/opt/conda/lib/python3.7/site-packages/prefect/utilities/executors.py", line 172, in timeout_handler
    return fn(*args, **kwargs)
  File "main.py", line 390, in apply_ticker_filters
  File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 233, in read_parquet
    **kwargs
  File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 139, in read_metadata
    fs, paths, gather_statistics, filters, kwargs.get("dataset", {})
  File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 76, in _determine_dataset_parts
    paths, filesystem=fs, filters=filters, **dataset_kwargs
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1060, in __init__
    self.validate_schemas()
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1092, in validate_schemas
    self.schema = self.pieces[0].get_metadata().schema
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 560, in get_metadata
    f = self.open()
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 567, in open
    reader = self.open_file_func(self.path)
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 942, in _open_dataset_file
    buffer_size=dataset.buffer_size
  File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 137, in __init__
    read_dictionary=read_dictionary, metadata=metadata)
  File "pyarrow/_parquet.pyx", line 1036, in pyarrow._parquet.ParquetReader.open
  File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundError
d
Would you mind sharing the task code?
j
one min
Copy code
@task(name='Get Qualified Stocks', max_retries=10, retry_delay=timedelta(seconds=15), result_handler=RESULT_HANDLER, checkpoint=True)
def apply_ticker_filters(s3_path, tick_params):
    h_scoop = tick_params[0]
    h_hold = tick_params[1]
    ddv_min = tick_params[2]
    ddv_max = tick_params[3]
    price_min = tick_params[4]
    price_max = tick_params[5]
    process_order = tick_params[6]

    data = dd.read_parquet(
        f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
        storage_options={
            'key': 
            'secret': 
        }
    )
    data['process_order'] = process_order
    data['qualified_ddv'] = data['ddv_average'].apply(
        func=lambda x: True if ddv_min <= x <= ddv_max else False,
        meta=('qualified_ddv', 'bool')
    )
    data['qualified_price'] = data['adjusted_price'].apply(
        func=lambda x: True if price_min <= x <= price_max else False,
        meta=('qualified_price', 'bool')
    )
    data['is_qualified'] = (data['qualified_ddv'] & data['qualified_price']) == True
    data = data[data['is_qualified'] == True]
    data.to_parquet(
        f'{destination}',
        engine='pyarrow',
        compression='snappy',
        partition_on=['process_order'],
        write_index=False,
        write_metadata_file=False,
        storage_options={
            'key': ,
            'secret': ,
        },
    )
    return '{destination}'
d
My suggestion at this point is to add a debugger before this line:
Copy code
data = dd.read_parquet(
        f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
and confirm that the file is in s3 at the path that you’re expecting at that moment