Brett Jurman
07/13/2021, 3:42 PMKevin Kho
Brett Jurman
07/13/2021, 3:45 PMimport prefect
from prefect import task, Flow
import time
import random
import numpy as np
from sklearn.linear_model import LinearRegression
@task
def pick_random_sizes():
time.sleep(2*60)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
return [random.randint(1e4, 1e6) for _ in range(10)]
@task
def train_model(size):
m = LinearRegression()
data = np.random.random((10000, 2))
targets = np.random.random((10000, 1))
m.fit(data, targets)
time.sleep(60)
return
@task
def eval_models(ms):
for m in ms:
print( m)
import prefect
from prefect import task, Flow
import time
import random
import numpy as np
from sklearn.linear_model import LinearRegression
@task
def pick_random_sizes():
time.sleep(2*60)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
return [random.randint(1e4, 1e6) for _ in range(10)]
@task
def train_model(size):
m = LinearRegression()
data = np.random.random((10000, 2))
targets = np.random.random((10000, 1))
m.fit(data, targets)
time.sleep(60)
return
@task
def eval_models(ms):
for m in ms:
print( m)
with Flow("full-flow") as flow:
sizes = pick_random_sizes()
models = train_model.map(sizes)
eval_models(models)
# Register the flow under the "tutorial" project
flow.register(project_name="tester")
with Flow("full-flow") as flow:
sizes = pick_random_sizes()
models = train_model.map(sizes)
eval_models(models)
# Register the flow under the "tutorial" project
flow.register(project_name="tester")
Brett Jurman
07/13/2021, 3:45 PMBrett Jurman
07/13/2021, 3:45 PMexecutor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "my-software-env",
"shutdown_on_close": True,
"name": "prefect-executor",
},
)
flow.run(
executor=executor,
)
Brett Jurman
07/13/2021, 3:45 PMBrett Jurman
07/13/2021, 3:46 PMKevin Kho
flow.run
won’t be on the UI. It’s meant for local testing. Flow runs of registered flows will be.Brett Jurman
07/13/2021, 3:49 PMBrett Jurman
07/13/2021, 3:49 PMBrett Jurman
07/13/2021, 3:49 PMKevin Kho
Brett Jurman
07/13/2021, 3:50 PMBrett Jurman
07/13/2021, 3:50 PMBrett Jurman
07/13/2021, 3:50 PMBrett Jurman
07/13/2021, 3:50 PMBrett Jurman
07/13/2021, 3:53 PMBrett Jurman
07/13/2021, 3:53 PMKevin Kho
client.create_flow_run
. All flow runs of registered flows will be tracked, just not the flow.run
. client.create_flow_run
will be though. The agent
will need the Coiled key to spin up the executor. You need an agent running to run flows, so it’s that agent that will be authenticated to spin up the Coiled Executor. And not a problem!Brett Jurman
07/13/2021, 3:58 PMBrett Jurman
07/13/2021, 4:00 PMBrett Jurman
07/13/2021, 4:00 PMBrett Jurman
07/13/2021, 4:01 PMKevin Kho
flow.register()
gives you an id that you can pass to the client. No you can’t spin up the executor without an agent. The agent is a long running process that listens for the flows and then starts them so there has to be something on that listens for flow runs. The Coiled cluster is ephemeral and only runs for the duration of the flow. The agent can be started with something like prefect agent local start
. The agent is very lightweight (assuming all your flows outsource the compute) so a lot of users just use a minimal ec2 instance to host their agent that kicks off the flowsBrett Jurman
07/13/2021, 4:05 PMBrett Jurman
07/13/2021, 4:05 PMBrett Jurman
07/13/2021, 4:05 PMBrett Jurman
07/13/2021, 4:05 PMBrett Jurman
07/13/2021, 4:06 PMBrett Jurman
07/13/2021, 4:06 PMdocker Manage Prefect Docker agents.
ecs Manage Prefect ECS agents.
fargate Manage Prefect Fargate agents (DEPRECATED).
kubernetes Manage Prefect Kubernetes agents.
local Manage Prefect Local agents.
Brett Jurman
07/13/2021, 4:06 PMKevin Kho
Brett Jurman
07/13/2021, 4:07 PMBrett Jurman
07/13/2021, 4:07 PMBrett Jurman
07/13/2021, 4:07 PMKevin Kho