John Ramirez
02/27/2020, 3:04 PMDylan
Dylan
flow.visualize() and a brief description of your problem, I’d be happy to help!John Ramirez
02/27/2020, 3:59 PMJohn Ramirez
02/27/2020, 4:01 PMDylan
Apply Stock Filters begins before Get Item finishes?Dylan
s3_path isn’t being returned correctly from GetItem in every case?John Ramirez
02/27/2020, 4:10 PMtick_params to get the full pathDylan
Get Stock Filters and GetItem as upstream dependencies of Apply Stock Filters, so Apply Stock Filters will not start until both have finishedDylan
Dylan
unmapped helper function?Dylan
John Ramirez
02/27/2020, 4:12 PMqualified_stocks = apply_ticker_filters.map(
unmapped(return_data[0]),
ticker_filters,
)Dylan
John Ramirez
02/27/2020, 4:13 PMFebruary 27th 2020 at 10:25:45am | prefect.CloudTaskRunner
ERROR
Unexpected error: ArrowIOError('Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundError')
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 877, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/opt/conda/lib/python3.7/site-packages/prefect/utilities/executors.py", line 172, in timeout_handler
return fn(*args, **kwargs)
File "main.py", line 390, in apply_ticker_filters
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 233, in read_parquet
**kwargs
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 139, in read_metadata
fs, paths, gather_statistics, filters, kwargs.get("dataset", {})
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 76, in _determine_dataset_parts
paths, filesystem=fs, filters=filters, **dataset_kwargs
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1060, in __init__
self.validate_schemas()
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1092, in validate_schemas
self.schema = self.pieces[0].get_metadata().schema
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 560, in get_metadata
f = self.open()
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 567, in open
reader = self.open_file_func(self.path)
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 942, in _open_dataset_file
buffer_size=dataset.buffer_size
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 137, in __init__
read_dictionary=read_dictionary, metadata=metadata)
File "pyarrow/_parquet.pyx", line 1036, in pyarrow._parquet.ParquetReader.open
File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundErrorDylan
John Ramirez
02/27/2020, 4:23 PMJohn Ramirez
02/27/2020, 4:25 PM@task(name='Get Qualified Stocks', max_retries=10, retry_delay=timedelta(seconds=15), result_handler=RESULT_HANDLER, checkpoint=True)
def apply_ticker_filters(s3_path, tick_params):
h_scoop = tick_params[0]
h_hold = tick_params[1]
ddv_min = tick_params[2]
ddv_max = tick_params[3]
price_min = tick_params[4]
price_max = tick_params[5]
process_order = tick_params[6]
data = dd.read_parquet(
f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
storage_options={
'key':
'secret':
}
)
data['process_order'] = process_order
data['qualified_ddv'] = data['ddv_average'].apply(
func=lambda x: True if ddv_min <= x <= ddv_max else False,
meta=('qualified_ddv', 'bool')
)
data['qualified_price'] = data['adjusted_price'].apply(
func=lambda x: True if price_min <= x <= price_max else False,
meta=('qualified_price', 'bool')
)
data['is_qualified'] = (data['qualified_ddv'] & data['qualified_price']) == True
data = data[data['is_qualified'] == True]
data.to_parquet(
f'{destination}',
engine='pyarrow',
compression='snappy',
partition_on=['process_order'],
write_index=False,
write_metadata_file=False,
storage_options={
'key': ,
'secret': ,
},
)
return '{destination}'Dylan
data = dd.read_parquet(
f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
and confirm that the file is in s3 at the path that you’re expecting at that moment