Chris Leber
11/04/2021, 7:13 PMbatch = BatchSubmit(
job_name="test",
job_definition="run_test",
job_queue="test-queue",
boto_kwargs={},
)
with Flow(
"test Batch submit",
storage=S3(bucket="prefect-storage"),
run_config=ECSRun(task_definition=task_definition),
) as flow:
s3_path = Parameter(
"s3_path", default="<s3://data/TEST>"
)
file_suffix = Parameter("file_suffix", default=".txt")
array_size = Parameter("array_size", default=10)
batch.run(
batch_kwargs={
"arrayProperties": {"size": array_size},
"parameters": {"query_s3_path": s3_path, "file_suffix": file_suffix,},
}
)
Chris Leber
11/04/2021, 7:15 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/aws/batch.py", line 87, in run
response = batch_client.submit_job(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 388, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 680, in _make_api_call
request_dict = self._convert_to_request_dict(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 728, in _convert_to_request_dict
request_dict = self._serializer.serialize_to_request(
File "/usr/local/lib/python3.8/site-packages/botocore/validate.py", line 360, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter arrayProperties.size, value: <Task: validate_int_param>, type: <class 'prefect.tasks.core.function.FunctionTask'>, valid types: <class 'int'>
Invalid type for parameter parameters.query_s3_path, value: <Task: validate_str_param>, type: <class 'prefect.tasks.core.function.FunctionTask'>, valid types: <class 'str'>
Invalid type for parameter parameters.file_suffix, value: <Task: validate_str_param>, type: <class 'prefect.tasks.core.function.FunctionTask'>, valid types: <class 'str'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "prefect/flows.py", line 207, in <module>
batch.run(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/aws/batch.py", line 94, in run
raise FAIL(f"Failed to submit job '{job_name}' to AWS Batch.") from e
prefect.engine.signals.FAIL: Failed to submit job 'test' to AWS Batch.
Jake Kaplan
11/04/2021, 7:22 PMbatch.run
inside of the flow.
If you look at the example in the link below, you should only need to instantiate the task. .run
will be called for you when running the flow.
https://docs.prefect.io/core/task_library/overview.html#task-library-in-actionChris Leber
11/04/2021, 8:52 PM