Mathijs Miermans
12/13/2021, 10:57 PMexecution_name
is not unique. We're trying to set execution_name
to a uuid4 at run time, but that is probably happing at registration time. How can task arguments be computed dynamically at run time?from os import environ
import json
import uuid
import prefect
from prefect import Flow
from prefect.run_configs import ECSRun
from prefect.tasks.aws.step_function import StepActivate
with Flow("step_function_flow") as flow:
activate_step_function = StepActivate(
state_machine_arn=environ.get('STEP_FUNCTION_ARN'),
execution_name=str(uuid.uuid4()),
execution_input=json.dumps({
"Parameters": "{}",
}),
)
step_function_resut = activate_step_function()
flow.storage = prefect.storage.S3(
bucket='pocket-dataflows-storage-dev',
add_default_labels=False
)
flow.run_config = ECSRun(
labels=['Dev'],
task_role_arn=environ.get('PREFECT_TASK_ROLE_ARN'),
image='prefecthq/prefect:latest-python3.9',
)
flow.register(project_name="prefect-tutorial")
Kevin Kho
12/13/2021, 11:02 PMinit
methods and run
methods. Only the run
allows for dynamicism. The init
gets evaluated during registration. The thing to do here would be to subclass this task and override the run
method to support it.Mathijs Miermans
12/13/2021, 11:03 PMKevin Kho
12/13/2021, 11:04 PMclass MyStepActivate(Task):
def run(self,
state_machine_arn: str,
execution_name: str,
execution_input: str = "{}",
boto_kwargs: dict = None,credentials: dict = None):
step_client = get_boto_client(
"stepfunctions", credentials=credentials, boto_kwargs
)
response = step_client.start_execution(
stateMachineArn=state_machine_arn,
name=execution_name,
input=execution_input,
)
return response
map
cuz of the self usage. I would honestly just use the snippet aboveMathijs Miermans
12/13/2021, 11:07 PMKevin Kho
12/13/2021, 11:08 PMMathijs Miermans
12/14/2021, 4:41 PMKevin Kho
12/14/2021, 4:43 PMself
also. You got it so quicklyMathijs Miermans
12/14/2021, 4:45 PM