Lukas N.
11/03/2020, 5:15 PMS3Result
to my flows and I have encountered a weird error (template issue?), which I tried to replicate in a minimal example fashion. I'm running the flow through Prefect server which I set up using prefect server start
and using local agent prefect agent start
. More info in thread3 November 2020,06:11:19 agent INFO Submitted for execution: PID: 12005
3 November 2020,06:11:20 prefect.CloudFlowRunner INFO Beginning Flow run for 'test_flow'
3 November 2020,06:11:20 prefect.CloudFlowRunner DEBUG Using executor type LocalExecutor
3 November 2020,06:11:20 prefect.CloudFlowRunner DEBUG Flow 'test_flow': Handling state change from Scheduled to Running
3 November 2020,06:11:20 prefect.CloudTaskRunner INFO Task 'A': Starting task run...
3 November 2020,06:11:20 prefect.CloudTaskRunner DEBUG Task 'A': Handling state change from Pending to Running
3 November 2020,06:11:20 prefect.CloudTaskRunner DEBUG Task 'A': Calling task.run() method...
3 November 2020,06:11:20 prefect.S3Result DEBUG Starting to upload result to prefect_results/test_flow/03-11_17-11-14/A-3d92ad1d-7818-4d72-8894-58bb01d3cb69.prefect_result...
3 November 2020,06:11:21 prefect.S3Result DEBUG Finished uploading result to prefect_results/test_flow/03-11_17-11-14/A-3d92ad1d-7818-4d72-8894-58bb01d3cb69.prefect_result.
3 November 2020,06:11:21 prefect.CloudTaskRunner DEBUG Task 'A': Handling state change from Running to Success
3 November 2020,06:11:21 prefect.CloudTaskRunner INFO Task 'A': Finished task run for task with final state: 'Success'
3 November 2020,06:11:21 prefect.CloudTaskRunner INFO Task 'B': Starting task run...
3 November 2020,06:11:21 prefect.CloudTaskRunner DEBUG Task 'B': Handling state change from Pending to Running
3 November 2020,06:11:21 prefect.CloudTaskRunner DEBUG Task 'B': Calling task.run() method...
3 November 2020,06:11:21 prefect.B INFO Task B [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
3 November 2020,06:11:21 prefect.CloudTaskRunner DEBUG Task 'B': Handling state change from Running to Success
3 November 2020,06:11:21 prefect.CloudTaskRunner INFO Task 'B': Finished task run for task with final state: 'Success'
3 November 2020,06:11:21 prefect.CloudTaskRunner INFO Task 'C': Starting task run...
3 November 2020,06:11:21 prefect.S3Result DEBUG Starting to download result from prefect_results/{flow_name}/{scheduled_start_time:%d-%m_%H-%M-%S}/{task_full_name}-{task_run_id}.prefect_result...
3 November 2020,06:11:21 prefect.S3Result ERROR Unexpected error while reading from result handler: ClientError('An error occurred (404) when calling the HeadObject operation: Not Found',)
Traceback (most recent call last):
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/prefect/engine/results/s3_result.py", line 136, in read
Bucket=self.bucket, Key=location, Fileobj=stream
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
return future.result()
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/s3transfer/futures.py", line 106, in result
return self._coordinator.result()
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/s3transfer/futures.py", line 265, in result
raise self._exception
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/s3transfer/tasks.py", line 255, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/s3transfer/download.py", line 345, in _submit
**transfer_future.meta.call_args.extra_args
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/home/novotl/.cache/pypoetry/virtualenvs/flows-aRCZ5YQu-py3.6/lib/python3.6/site-packages/botocore/client.py", line 661, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
3 November 2020,06:11:21 prefect.CloudTaskRunner DEBUG Task 'C': Handling state change from Pending to Failed
3 November 2020,06:11:22 prefect.CloudTaskRunner INFO Task 'C': Finished task run for task with final state: 'Failed'
3 November 2020,06:11:22 prefect.CloudFlowRunner DEBUG Checking flow run state...
3 November 2020,06:11:22 prefect.CloudFlowRunner INFO Flow run FAILED: some reference tasks failed.
3 November 2020,06:11:22 prefect.CloudFlowRunner DEBUG Flow 'test_flow': Handling state change from Running to Failed
import os
from typing import List
import prefect
from prefect import Flow, task
from prefect.engine.results import S3Result
@task(name='A')
def task_a() -> List[int]:
return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
@task(name='B')
def task_b(foo: List) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f'Task B {foo}')
@task(name='C')
def task_c(foo: List[int]) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f'Task C {foo}')
result = S3Result(
bucket=os.environ.get('DATA_BUCKET'),
location='prefect_results/{flow_name}/'
'{scheduled_start_time:%d-%m_%H-%M-%S}/'
'{task_full_name}-{task_run_id}.prefect_result',
)
with Flow(name='test_flow', result=result) as flow:
a = task_a()
b = task_b(a)
c = task_c(a)
flow.set_dependencies(task=c, upstream_tasks=[b])
if __name__ == "__main__":
flow.register(project_name='default')
flow.run()
it finished successfully.Brian Mesick
11/03/2020, 5:45 PMLukas N.
11/03/2020, 9:43 PMflow.run()
and the local agent are run from the same environmentMariia Kerimova
11/04/2020, 9:24 PMlocation
, but still didn't figure out what's the problemLukas N.
11/05/2020, 10:22 PMMariia Kerimova
11/05/2020, 10:24 PM