Alex Turek
09/24/2022, 7:24 PMFailed
, or skip retries when it's marked as Succeeded
, by my codeAWS Batch -> AWS EventBridge -> AWS SQS
and I want to run a function (maybe a task, I'm not sure) to listen to SQS messages and update Prefect Task states from AWS Batch Job states.sleep N
commands in Batch to simulate real workload)
@task
async def update_task_states_from_batch_states(sub: Subscription) -> None:
# there's code in here to listen to sqs, and call
# set_task_run_state()
# from prefect.orion.api.task_runs
# using data in each SQS message
...
@flow
async def run_multiple_batch_jobs() -> None:
sub = setup_batch_subscription()
for sleep_time in [10, 20, 40, 80]:
run_sleeper.submit(sleep_time)
await update_task_states_from_batch_states.submit(sub)
clean_up_batch_subscription(sub)
12:04:23.271 | INFO | prefect.engine - Created flow run 'free-oxpecker' for flow 'run-multiple-batch-jobs'
12:04:23.438 | INFO | Flow run 'free-oxpecker' - Created task run 'setup_batch_subscription-9c30197c-0' for task 'setup_batch_subscription'
12:04:23.439 | INFO | Flow run 'free-oxpecker' - Executing 'setup_batch_subscription-9c30197c-0' immediately...
12:04:24.203 | INFO | Task run 'setup_batch_subscription-9c30197c-0' - Finished in state Completed()
12:04:24.249 | INFO | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-0' for task 'run_sleeper'
12:04:24.250 | INFO | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-0' for execution.
12:04:24.268 | INFO | Flow run 'free-oxpecker' - Created task run 'clean_up_batch_subscription-7f0069cc-0' for task 'clean_up_batch_subscription'
12:04:24.268 | INFO | Flow run 'free-oxpecker' - Executing 'clean_up_batch_subscription-7f0069cc-0' immediately...
12:04:24.316 | INFO | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-2' for task 'run_sleeper'
12:04:24.316 | INFO | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-2' for execution.
12:04:24.334 | INFO | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-3' for task 'run_sleeper'
12:04:24.335 | INFO | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-3' for execution.
12:04:24.355 | INFO | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-1' for task 'run_sleeper'
12:04:24.355 | INFO | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-1' for execution.
12:04:24.538 | INFO | Task run 'run_sleeper-076a36b9-0' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.635 | INFO | Task run 'run_sleeper-076a36b9-3' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.701 | INFO | Task run 'run_sleeper-076a36b9-1' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.737 | INFO | Task run 'run_sleeper-076a36b9-2' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
<forever>
Christopher Boyd
09/24/2022, 9:32 PMAnna Geller
09/24/2022, 10:27 PMAlex Turek
09/25/2022, 3:44 AMclient_waiter
but boto3 doesn't support that for batch states (which makes sense, there's no corresponding AWS Batch API so you'd have to poll)Batch -> Eventbridge -> Sqs
so, let me paste what I got working so you can start from there
resource "aws_cloudwatch_event_rule" "all_batch_job_states" {
name = "match-batch-job-states"
description = "match Batch Job States"
event_pattern = jsonencode({
"source" : ["aws.batch"]
"detail-type": ["Batch Job State Change"],
})
}
resource "aws_sqs_queue" "job_state_updates" {
name = "job-state-updates"
}
resource "aws_cloudwatch_event_target" "job_states_target" {
target_id = "sqs-for-job-states"
arn = aws_sqs_queue.job_state_updates.arn
rule = aws_cloudwatch_event_rule.all_batch_job_states.name
}
resource "aws_sqs_queue_policy" "allow_eventbridge_forwarding" {
queue_url = aws_sqs_queue.job_state_updates.id
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSAccess",
"Effect": "Allow",
"Principal": {
"Service": "<http://events.amazonaws.com|events.amazonaws.com>"
},
"Action": "sqs:SendMessage",
"Resource": "${aws_sqs_queue.job_state_updates.arn}",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "${aws_cloudwatch_event_rule.all_batch_job_states.arn}"
}
}
}
]
}
POLICY
}
Anna Geller
09/25/2022, 10:38 AMAlex Turek
09/25/2022, 10:14 PMTasks check in on dynamodb for their statehow does that work from a flow/task code perspective? is there some code that the task has to declare/implement, that returns the current state, and doesn't cause an immediate retry? that is part of my fundamental problem, I can't just return
Running
without triggering an immediate retry by the orchestratorInfrastructure
concept is, from reading the docs and a lot of the prefect code ๐
if I inherited from Infrastructure and implemented this method, I would be able to
1. Submit a batch job
2. Wait/poll on updates, reporting the job starting to task_status
3. Wait/poll on further updates, reporting the job's exit code
@abc.abstractmethod
async def run(self, task_status: TaskStatus = None) -> InfrastructureResult:
so the only states I'd be able to (effectively) report, and really the only ones that are relevant, are RUNNING (I assume, by calling task_status), and COMPLETED or FAILED from the exit code on the InfrastructureResult
I think I can get that running locally. TBD on contributing that back upstream, but I'll do my bestChristopher Boyd
09/26/2022, 4:19 PMAlex Turek
09/26/2022, 4:23 PMAnna Geller
09/26/2022, 6:14 PMAlex Turek
09/26/2022, 9:06 PMAnna Geller
09/26/2022, 9:36 PM