Brian Keating
04/09/2021, 5:45 AMResourceManager
to create and terminate EC2 instances. I test it out with a bare bones workflow:
@task
def do_something_on_instance(instance_id):
prefect.context.get('logger').info(f'Do something on instance {instance_id}')
with Flow('hello-ec2') as flow:
with EC2Instance('t2.micro') as instance_id:
do_something_on_instance(instance_id) # instance_id is a string
This works correctly when using github storage, but when I switch to S3, the flow fails with TypeError: cannot pickle 'SSLContext' object
. Anyone know what's going on here? Note that the value returned by EC2Instance.setup
is a str
.Kevin Kho
Kevin Kho
Brian Keating
04/09/2021, 3:27 PMimport boto3
import prefect
from prefect import resource_manager
AWS_REGION = 'us-west-2'
@resource_manager
class EC2Instance:
def __init__(self, instance_type):
self.log = prefect.context.get('logger').info
self.instance_type = instance_type
self.instance_id = None
self.ec2_resource = boto3.resource('ec2', region_name=AWS_REGION)
self.ec2_client = boto3.client('ec2', region_name=AWS_REGION)
def setup(self):
datetime_str = prefect.context['date'].strftime('%Y-%m-%d_%H:%M') # TODO: convert to SD time
flow_name = prefect.context["flow_name"]
flow_run_name = f'prefect-{flow_name}-{datetime_str}'
self.log(f'Launching a new {self.instance_type} instance')
instances = self.ec2_resource.create_instances(
ImageId='ami-0ca5c3bd5a268e7db', # ubuntu 20 in us-west-2
InstanceType=self.instance_type,
MinCount=1,
MaxCount=1,
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [{'Key': 'Name', 'Value': flow_run_name}]}
]
# BlockDeviceMappings = [...] # specify disk size/speed. not all disks work for all instance types
)
self.instance_id = instances[0].instance_id
self.log(f'Waiting for {self.instance_id} instance to initialize')
self.ec2_client.get_waiter('instance_status_ok').wait(InstanceIds=[self.instance_id])
self.log(f'{self.instance_id} is ready!')
return self.instance_id
def cleanup(self, instance_id):
self.log(f'Terminating {instance_id} instance')
self.ec2_client.terminate_instances(InstanceIds=[instance_id])
self.log(f'Waiting for instance {instance_id} to terminate')
self.ec2_client.get_waiter('instance_terminated').wait(InstanceIds=[instance_id])
self.log(f'Instance {instance_id} terminated')
Kevin Kho
self
in setup
and then just grab the instance_id
by doing self.instance_id
?Brian Keating
04/09/2021, 3:36 PMKevin Kho
instance_id
in the flow and task?Kevin Kho
Brian Keating
04/09/2021, 3:53 PMEC2Instance.setup
to return self
instead of self.instance_id
and the flow now looks like
@task
def do_something_on_instance(instance):
prefect.context.get('logger').info(f'Do something on instance {instance.instance_id}')
with Flow('hello-ec2') as flow:
with EC2Instance('t2.micro') as instance:
do_something_on_instance(instance)