John Ramirez
02/27/2020, 3:04 PMDylan
02/27/2020, 3:31 PMflow.visualize()
and a brief description of your problem, I’d be happy to help!John Ramirez
02/27/2020, 3:59 PMDylan
02/27/2020, 4:09 PMApply Stock Filters
begins before Get Item
finishes?s3_path
isn’t being returned correctly from GetItem
in every case?John Ramirez
02/27/2020, 4:10 PMtick_params
to get the full pathDylan
02/27/2020, 4:11 PMGet Stock Filters
and GetItem
as upstream dependencies of Apply Stock Filters
, so Apply Stock Filters
will not start until both have finishedunmapped
helper function?John Ramirez
02/27/2020, 4:12 PMqualified_stocks = apply_ticker_filters.map(
unmapped(return_data[0]),
ticker_filters,
)
Dylan
02/27/2020, 4:12 PMJohn Ramirez
02/27/2020, 4:13 PMFebruary 27th 2020 at 10:25:45am | prefect.CloudTaskRunner
ERROR
Unexpected error: ArrowIOError('Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundError')
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 877, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/opt/conda/lib/python3.7/site-packages/prefect/utilities/executors.py", line 172, in timeout_handler
return fn(*args, **kwargs)
File "main.py", line 390, in apply_ticker_filters
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 233, in read_parquet
**kwargs
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 139, in read_metadata
fs, paths, gather_statistics, filters, kwargs.get("dataset", {})
File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 76, in _determine_dataset_parts
paths, filesystem=fs, filters=filters, **dataset_kwargs
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1060, in __init__
self.validate_schemas()
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 1092, in validate_schemas
self.schema = self.pieces[0].get_metadata().schema
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 560, in get_metadata
f = self.open()
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 567, in open
reader = self.open_file_func(self.path)
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 942, in _open_dataset_file
buffer_size=dataset.buffer_size
File "/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py", line 137, in __init__
read_dictionary=read_dictionary, metadata=metadata)
File "pyarrow/_parquet.pyx", line 1036, in pyarrow._parquet.ParquetReader.open
File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Arrow error: IOError: The specified key does not exist.. Detail: Python exception: FileNotFoundError
Dylan
02/27/2020, 4:16 PMJohn Ramirez
02/27/2020, 4:23 PM@task(name='Get Qualified Stocks', max_retries=10, retry_delay=timedelta(seconds=15), result_handler=RESULT_HANDLER, checkpoint=True)
def apply_ticker_filters(s3_path, tick_params):
h_scoop = tick_params[0]
h_hold = tick_params[1]
ddv_min = tick_params[2]
ddv_max = tick_params[3]
price_min = tick_params[4]
price_max = tick_params[5]
process_order = tick_params[6]
data = dd.read_parquet(
f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
storage_options={
'key':
'secret':
}
)
data['process_order'] = process_order
data['qualified_ddv'] = data['ddv_average'].apply(
func=lambda x: True if ddv_min <= x <= ddv_max else False,
meta=('qualified_ddv', 'bool')
)
data['qualified_price'] = data['adjusted_price'].apply(
func=lambda x: True if price_min <= x <= price_max else False,
meta=('qualified_price', 'bool')
)
data['is_qualified'] = (data['qualified_ddv'] & data['qualified_price']) == True
data = data[data['is_qualified'] == True]
data.to_parquet(
f'{destination}',
engine='pyarrow',
compression='snappy',
partition_on=['process_order'],
write_index=False,
write_metadata_file=False,
storage_options={
'key': ,
'secret': ,
},
)
return '{destination}'
Dylan
02/27/2020, 4:30 PMdata = dd.read_parquet(
f'{s3_path}scoop={h_scoop}/hold_period={h_hold}/*.parquet',
and confirm that the file is in s3 at the path that you’re expecting at that moment