I've written a `ResourceManager` to create and ter...
# ask-community
b
I've written a
ResourceManager
to create and terminate EC2 instances. I test it out with a bare bones workflow:
Copy code
@task
def do_something_on_instance(instance_id):
    prefect.context.get('logger').info(f'Do something on instance {instance_id}')

with Flow('hello-ec2') as flow:
    with EC2Instance('t2.micro') as instance_id:
        do_something_on_instance(instance_id)  # instance_id is a string
This works correctly when using github storage, but when I switch to S3, the flow fails with
TypeError: cannot pickle 'SSLContext' object
. Anyone know what's going on here? Note that the value returned by
EC2Instance.setup
is a
str
.
k
Hi @Brian Keating! Can you share the code for the EC2Instance context manager? I think it works with Github Storage because that is script based storage : https://docs.prefect.io/orchestration/flow_config/storage.html#script-based-storage
As opposed to pickle based
upvote 1
b
Copy code
import boto3
import prefect
from prefect import resource_manager

AWS_REGION = 'us-west-2'


@resource_manager
class EC2Instance:
    def __init__(self, instance_type):
        self.log = prefect.context.get('logger').info
        self.instance_type = instance_type
        self.instance_id = None
        self.ec2_resource = boto3.resource('ec2', region_name=AWS_REGION)
        self.ec2_client = boto3.client('ec2', region_name=AWS_REGION)
        
    def setup(self):
        datetime_str = prefect.context['date'].strftime('%Y-%m-%d_%H:%M')  # TODO: convert to SD time
        flow_name = prefect.context["flow_name"]
        flow_run_name = f'prefect-{flow_name}-{datetime_str}'
        
        self.log(f'Launching a new {self.instance_type} instance')
        instances = self.ec2_resource.create_instances(
            ImageId='ami-0ca5c3bd5a268e7db',  # ubuntu 20 in us-west-2
            InstanceType=self.instance_type,
            MinCount=1,
            MaxCount=1,
            TagSpecifications=[{
                'ResourceType': 'instance',
                'Tags': [{'Key': 'Name', 'Value': flow_run_name}]}
            ]
        #     BlockDeviceMappings = [...]  # specify disk size/speed. not all disks work for all instance types
        )
        self.instance_id = instances[0].instance_id
        self.log(f'Waiting for {self.instance_id} instance to initialize')
        self.ec2_client.get_waiter('instance_status_ok').wait(InstanceIds=[self.instance_id])
        self.log(f'{self.instance_id} is ready!')
        return self.instance_id

    def cleanup(self, instance_id):
        self.log(f'Terminating {instance_id} instance')
        self.ec2_client.terminate_instances(InstanceIds=[instance_id])
        self.log(f'Waiting for instance {instance_id} to terminate')
        self.ec2_client.get_waiter('instance_terminated').wait(InstanceIds=[instance_id])
        self.log(f'Instance {instance_id} terminated')
k
Can you return
self
in
setup
and then just grab the
instance_id
by doing
self.instance_id
?
b
no luck, same error
k
Are you still using the
instance_id
in the flow and task?
This thread has a more detailed snippet that might help you : https://prefect-community.slack.com/archives/CL09KU1K7/p1617976513307700
b
I changed the
EC2Instance.setup
to return
self
instead of
self.instance_id
and the flow now looks like
Copy code
@task
def do_something_on_instance(instance):
    prefect.context.get('logger').info(f'Do something on instance {instance.instance_id}')


with Flow('hello-ec2') as flow:
    with EC2Instance('t2.micro') as instance:
        do_something_on_instance(instance)