Edit: Wrong channel, sorry! This should have gone ...
# show-us-what-you-got
m
Edit: Wrong channel, sorry! This should have gone in #prefect-community. I'm trying to use the stepactivate task to execute an AWS step function, and it's failing on the second flow run, because
execution_name
is not unique. We're trying to set
execution_name
to a uuid4 at run time, but that is probably happing at registration time. How can task arguments be computed dynamically at run time?
Copy code
from os import environ
import json
import uuid

import prefect
from prefect import Flow
from prefect.run_configs import ECSRun
from prefect.tasks.aws.step_function import StepActivate


with Flow("step_function_flow") as flow:
    activate_step_function = StepActivate(
        state_machine_arn=environ.get('STEP_FUNCTION_ARN'),
        execution_name=str(uuid.uuid4()),
        execution_input=json.dumps({
            "Parameters": "{}",
        }),
    )

    step_function_resut = activate_step_function()

flow.storage = prefect.storage.S3(
    bucket='pocket-dataflows-storage-dev',
    add_default_labels=False
)

flow.run_config = ECSRun(
    labels=['Dev'],
    task_role_arn=environ.get('PREFECT_TASK_ROLE_ARN'),
    image='prefecthq/prefect:latest-python3.9',
)

flow.register(project_name="prefect-tutorial")
I tried searching on Github for examples, but I didn't find any.
k
Hey @Mathijs Miermans, ideally this post should go in the #prefect-community community channel as this channel is mainly for community written content or blogs. This task looks like it’s not written to support that. These tasks have
init
methods and
run
methods. Only the
run
allows for dynamicism. The
init
gets evaluated during registration. The thing to do here would be to subclass this task and override the
run
method to support it.
m
Oh, sorry! Had the wrong channel open. Will move it there now.
k
Something like this:
Copy code
class MyStepActivate(Task):

    def run(self, 
        state_machine_arn: str,
        execution_name: str,
        execution_input: str = "{}",
        boto_kwargs: dict = None,credentials: dict = None):

        step_client = get_boto_client(
            "stepfunctions", credentials=credentials, boto_kwargs
        )

        response = step_client.start_execution(
            stateMachineArn=state_machine_arn,
            name=execution_name,
            input=execution_input,
        )

        return response
Oh don’t worry about it
Yeah that task is also not thread safe so you can’t use
map
cuz of the self usage. I would honestly just use the snippet above
m
Thanks! I was thinking something like that. I'm guessing we might be the first ones using this task in production, because it doesn't seem usable in its current state.
I'll create a PR if we get it working.
k
a PR would be appreciated, and yes I’ve never seen anything ask about this in the last 9 months
If ever you make a PR, the MySQL task is a good example
🙌 1
m
k
Wow! Thanks for the quick turnaround on this. The core team will see it and review it, but you can ping me if it’s been a while and they haven’t yet
Great job removing the
self
also. You got it so quickly
m
Yeah, same to you! We were amazed to get a 2 minute response to our question! Some cloud providers who charge a lot of 💰 for support are slower than that. 😉
Also: Prefect code is so readable. Nice job on code quality.
🙏 2