https://prefect.io logo
Title
s

Santiago Gonzalez

09/13/2021, 6:52 PM
Hi. I have an issue in a flow. It runs a task, that consists in runs several commands in a EC2 instance. Once it’s finished, it passed through a Manual Check, and then run another set of commands in the same EC2 Instance. The first step took about 1h 43 min, and once the final step is achieved, the status of the previous steps are being lost (instance_id, and commands to be executed). NOTE: InstanceId are the output of a task that creates the EC2 Instance. Do you have any clue of why task’s status could be loss somehow?
k

Kevin Kho

09/13/2021, 6:59 PM
Hey @Santiago Gonzalez, what is the Manual Check here? Are you using the Prefect PAUSED state? What do you mean by lost? It doesn’t show up in the UI?
s

Santiago Gonzalez

09/13/2021, 7:13 PM
It is a task with manual trigger (that is what I meant about manual check). I have a logger that trace the commands that are being executed, and in that case, it traces
None
. So I assume that the output is being lost.
k

Kevin Kho

09/13/2021, 7:14 PM
Could you sow me a bit of your task and Flow code so I can get a better picture?
s

Santiago Gonzalez

09/13/2021, 7:43 PM
@task
def create_ec2_instance():
	instances = boto3,create_ec2_instance('..........')
	instance = instances[0]
    waiter = ec2_client.get_waiter('instance_status_ok')
    waiter.wait(InstanceIds=[instance.instance_id])
    return instance,instance_id

@task
def create_commands(warmup: Boolean):
	return ['something to execute', "something else", "something more"]


@task
def execute_job_in_ec2_instance(instance_id, commands: List):
	ssm = get_boto_client('ssm', region_name='us-west-2', use_session=True)
    command_invocation = ssm.send_command(InstanceIds=[instance_id],....., commands=commands)

@task(name="Manual check", trigger=manual_only)
def manual_check():
	return

Flow('flow') as flow:
	.....
	warmup_commands = create_commands(True)
	full_commands = create_commands(False)
    
    instance_id = create_ec2_instance()

    warm_up_execution = execute_job_in_ec2_instance(instance_id=instance_id,
                                                    commands=warmup_commands)

    manual_check = manual_check()
    manual_check.set_upstream(warm_up_execution)

    full_execution = execute_job_in_ec2_instance(instance_id=instance_id,
                                                 commands=full_commands)
    full_execution.set_upstream(warm_up_execution)
    full_execution.set_upstream(manual_check)
k

Kevin Kho

09/13/2021, 7:54 PM
I see. Will ask the team about this
👍 1
s

Santiago Gonzalez

09/13/2021, 8:01 PM
Thanks