Question: How can I manually control a task status...
# prefect-community
a
Question: How can I manually control a task status, without the orchestrator trying to rerun it? I'm returning a (non final) task state and getting automatically retried. I'd like to not retry until the task is
Failed
, or skip retries when it's marked as
Succeeded
, by my code
βœ… 1
Context: I'm trying to figure out how to manually manage AWS Batch Jobs as Prefect Tasks. Batch jobs are submitted to the scheduler, and there's no way to automagically "wait" on them to be successful or failed. So I've set up this
Copy code
AWS Batch -> AWS EventBridge -> AWS SQS
and I want to run a function (maybe a task, I'm not sure) to listen to SQS messages and update Prefect Task states from AWS Batch Job states.
Here's the relevant parts of my flow/tasks (right now I'm just running
sleep N
commands in Batch to simulate real workload)
Copy code
@task
async def update_task_states_from_batch_states(sub: Subscription) -> None:
    # there's code in here to listen to sqs, and call
    #   set_task_run_state()
    # from prefect.orion.api.task_runs
    # using data in each SQS message
    ...


@flow
async def run_multiple_batch_jobs() -> None:
    sub = setup_batch_subscription()

    for sleep_time in [10, 20, 40, 80]:
        run_sleeper.submit(sleep_time)

    await update_task_states_from_batch_states.submit(sub)

    clean_up_batch_subscription(sub)
Here's what my logs look like when I do this
Copy code
12:04:23.271 | INFO    | prefect.engine - Created flow run 'free-oxpecker' for flow 'run-multiple-batch-jobs'
12:04:23.438 | INFO    | Flow run 'free-oxpecker' - Created task run 'setup_batch_subscription-9c30197c-0' for task 'setup_batch_subscription'
12:04:23.439 | INFO    | Flow run 'free-oxpecker' - Executing 'setup_batch_subscription-9c30197c-0' immediately...
12:04:24.203 | INFO    | Task run 'setup_batch_subscription-9c30197c-0' - Finished in state Completed()
12:04:24.249 | INFO    | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-0' for task 'run_sleeper'
12:04:24.250 | INFO    | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-0' for execution.
12:04:24.268 | INFO    | Flow run 'free-oxpecker' - Created task run 'clean_up_batch_subscription-7f0069cc-0' for task 'clean_up_batch_subscription'
12:04:24.268 | INFO    | Flow run 'free-oxpecker' - Executing 'clean_up_batch_subscription-7f0069cc-0' immediately...
12:04:24.316 | INFO    | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-2' for task 'run_sleeper'
12:04:24.316 | INFO    | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-2' for execution.
12:04:24.334 | INFO    | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-3' for task 'run_sleeper'
12:04:24.335 | INFO    | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-3' for execution.
12:04:24.355 | INFO    | Flow run 'free-oxpecker' - Created task run 'run_sleeper-076a36b9-1' for task 'run_sleeper'
12:04:24.355 | INFO    | Flow run 'free-oxpecker' - Submitted task run 'run_sleeper-076a36b9-1' for execution.
12:04:24.538 | INFO    | Task run 'run_sleeper-076a36b9-0' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.635 | INFO    | Task run 'run_sleeper-076a36b9-3' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.701 | INFO    | Task run 'run_sleeper-076a36b9-1' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...
12:04:24.737 | INFO    | Task run 'run_sleeper-076a36b9-2' - Received non-final state 'Running' when proposing final state 'Running' and will attempt to run again...

<forever>
Before somebody suggests using something other than Batch: I need AWS Batch (vs something like ECS, where I can wait on the task to complete) because Batch can provide GPU resources, which is a requirement for some ML model training
c
Jobs are submitted through a helper utility that checks dynamodb for a task in succeeded state before continuing
There's a readme in that repo that discusses it, maybe it can help your case ?
The full order is helper submits a task -> sqs -> lambda -> batch, eventbridge -> dynamodb. Tasks check in on dynamodb for their state
a
and we would also totally welcome an infrastructure block contribution to that repo, this way you can run your flows or any other command on AWS Batch directly
πŸ‘€ 1
a
@Christopher Boyd this is awesome (and also more complex but more complete than my solution so far), I'd be happy to start here. I'm glad I'm not super far out of field for your planned use cases, and that I wasn't overcomplicating it - it just is fundamentally that hard the dynamodb table was something I considered but was trying to avoid just from a complexity perspective. storing job state resiliently does seem like a good idea though
@Anna Geller I saw the AWS batch support thing, that was a great start. once I realized it only seemed to cover submit_job (as of last week) I started trying to figure out how to roll my own I was hoping to use a
client_waiter
but boto3 doesn't support that for batch states (which makes sense, there's no corresponding AWS Batch API so you'd have to poll)
Christopher, I checked out your branch. I didn't see the terraform for
Batch -> Eventbridge -> Sqs
so, let me paste what I got working so you can start from there
Copy code
resource "aws_cloudwatch_event_rule" "all_batch_job_states" {
  name = "match-batch-job-states"
  description = "match Batch Job States"
  event_pattern = jsonencode({
    "source" : ["aws.batch"]
    "detail-type": ["Batch Job State Change"],
  })
}

resource "aws_sqs_queue" "job_state_updates" {
  name = "job-state-updates"
}

resource "aws_cloudwatch_event_target" "job_states_target" {
  target_id = "sqs-for-job-states"
  arn = aws_sqs_queue.job_state_updates.arn
  rule = aws_cloudwatch_event_rule.all_batch_job_states.name
}

resource "aws_sqs_queue_policy" "allow_eventbridge_forwarding" {
  queue_url = aws_sqs_queue.job_state_updates.id
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "SQSAccess",
      "Effect": "Allow",
      "Principal": {
        "Service": "<http://events.amazonaws.com|events.amazonaws.com>"
      },
      "Action": "sqs:SendMessage",
      "Resource": "${aws_sqs_queue.job_state_updates.arn}",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "${aws_cloudwatch_event_rule.all_batch_job_states.arn}"
        }
      }
    }
  ]
}
POLICY
}
(this took me a while to figure out. stupid SQS having its own policies πŸ™‚ )
I am 80% sure you don't need a separate IAM policy for eventbridge
a
Thanks for sharing your approach
a
@Christopher Boyd
Tasks check in on dynamodb for their state
how does that work from a flow/task code perspective? is there some code that the task has to declare/implement, that returns the current state, and doesn't cause an immediate retry? that is part of my fundamental problem, I can't just return
Running
without triggering an immediate retry by the orchestrator
(sorry about messaging this thread, just want to document what I'm learning for others) Anna, I see what you mean about the Infrastructure block. I think I finally managed to understand what the
Infrastructure
concept is, from reading the docs and a lot of the prefect code πŸ™‚ if I inherited from Infrastructure and implemented this method, I would be able to 1. Submit a batch job 2. Wait/poll on updates, reporting the job starting to
task_status
3. Wait/poll on further updates, reporting the job's exit code
Copy code
@abc.abstractmethod
    async def run(self, task_status: TaskStatus = None) -> InfrastructureResult:
so the only states I'd be able to (effectively) report, and really the only ones that are relevant, are RUNNING (I assume, by calling task_status), and COMPLETED or FAILED from the exit code on the InfrastructureResult I think I can get that running locally. TBD on contributing that back upstream, but I'll do my best
πŸ™ 1
πŸ‘ 1
how often are infrastructure blocks instantiated? the ECSTask one looks like it's per task run
c
blocks are β€œregistered” with the flow wheh the deployment is registered
so at execution time, although with regards to Batch tasks, I’m not sure, I’d suspect whenever a task is executed / submitted
a
the way I've set up my AWS code is to set up infra (SQS queue) per flow run
a
the ECSTask block, by default, registers a new task definition at runtime so it's quite similar πŸ‘ great to see you were able to implement that for AWSBatch, I'd be super curious to see how you did it, even if you could share a code snippet as GitHub Gist, that would be fantastic πŸš€
a
yeah feel free to send this to other folks. I definitely won't be able to open source the infrastructure block this fast πŸ™‚ https://gist.github.com/alexturek/eec410a5326ad4506a7c200b1644edd2
πŸ™ 1
❀️ 1
a
thanks so much for sharing!
πŸ˜„ 1