Cole Murray
09/10/2022, 1:43 AMCan't pickle local object '__create_fn__.<locals>.__init__'
Traceback (most recent call last):
File "/opt/venv/lib/python3.8/site-packages/workflow_etl/flows/flow.py", line 265, in hello_world
device_paths = get_object_paths(<redacted>)
File "/opt/venv/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 727, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/opt/venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 859, in create_task_run_then_submit
return await future._result()
File "/opt/venv/lib/python3.8/site-packages/prefect/futures.py", line 227, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/opt/venv/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/opt/venv/lib/python3.8/site-packages/prefect/task_runners.py", line 214, in submit
result = await run_fn(**run_kwargs)
File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 1000, in begin_task_run
return await orchestrate_task_run(
File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 1120, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/opt/venv/lib/python3.8/site-packages/prefect/states.py", line 129, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/opt/venv/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/opt/venv/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/opt/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/opt/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
AttributeError: Can't pickle local object '__create_fn__.<locals>.__init__'
@task
def get_object_paths(<redacted>) -> List[DeviceDataFilePath]:
return _get_object_paths(...)
def _get_object_paths(<redacted>) -> List[DeviceDataFilePath]:
"""
redacted
"""
time_now = datetime.datetime.utcnow()
logger = get_run_logger()
device_data = []
for device_id in device_ids:
for day in range(num_days):
day_time = time_now - datetime.timedelta(days=day)
year = day_time.year
month = day_time.month
day = day_time.day
filter_pattern = f"<redacted>"
<http://logger.info|logger.info>(f"Filter pattern: {filter_pattern}")
filtered_objs = aws_utils.list_bucket(bucket=input_bucket, prefix=filter_pattern)
objs = list(filtered_objs)
objs = sorted(objs, key=lambda obj: obj.last_modified, reverse=True)
...<redacted>
return device_data
@flow
def hello_world(...<redacted>...):
try:
bucket = input_bucket
device_paths = get_object_paths(<redacted>)
...
Anna Geller
09/10/2022, 12:25 PMCole Murray
09/10/2022, 7:49 PM