Raviraja Ganta
05/04/2022, 8:51 PMAnna Geller
05/04/2022, 9:00 PMRaviraja Ganta
05/04/2022, 9:03 PMAnna Geller
05/04/2022, 9:04 PMRaviraja Ganta
05/04/2022, 9:06 PMfrom clusters import cluster_creation_flow
from prefect.run_configs import DockerRun
from prefect.storage import S3
storage = S3(bucket="taxonomy-flows")
# Add both Flows to storage
storage.add_flow(cluster_creation_flow)
storage.build()
cpu_run_config = DockerRun(
image="<http://134123414323.dkr.ecr.us-east-2.amazonaws.com/taxonomy-flows:latest|134123414323.dkr.ecr.us-east-2.amazonaws.com/taxonomy-flows:latest>",
labels=["cpu"],
)
# Reassign the new storage object to each Flow
cluster_creation_flow.storage = storage
cluster_creation_flow.run_config = cpu_run_config
# Register each flow without building a second time
cluster_creation_flow.register(project_name="test_project", build=False)
Anna Geller
05/04/2022, 9:09 PMimport platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun
import subprocess
PREFECT_PROJECT_NAME = "community"
FLOW_NAME = "s3_docker_run_local_image"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
bucket="prefectdata",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = DockerRun(image="<http://134123414323.dkr.ecr.us-east-2.amazonaws.com/taxonomy-flows:latest|134123414323.dkr.ecr.us-east-2.amazonaws.com/taxonomy-flows:latest>", labels=[AGENT_LABEL],)
@task(log_stdout=True)
def hello_world(x: str):
print(f"Hello {x} from {FLOW_NAME}!")
print(
f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
)
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
user_input = Parameter("user_input", default="Marvin")
hw = hello_world(user_input)
if __name__ == "__main__":
subprocess.run(
f"prefect register --project {PREFECT_PROJECT_NAME} -p flows/s3_docker_run_local_image.py",
shell=True,
)
subprocess.run(
f"prefect run --name {FLOW_NAME} --project {PREFECT_PROJECT_NAME}", shell=True
)
subprocess.run(
f'prefect agent docker start --label {AGENT_LABEL} --volume ~/.aws:/root/.aws"',
shell=True,
)
Raviraja Ganta
05/04/2022, 9:14 PMyou don't need to build storage explicitly as it gets built when you register your flowyeah.. i was keeping build as false during registration. can remove that. it is working locally. problem is i want to run the same (registration) in github actions. But the flow is depending on other packages. when ran local it has the packages so it is able to register, but when ran in github actions it is failing. Now I am building the packages using docker image. If somehow we can run the prefect registration code inside that already built dependency container then it is solved. Otherwise I have to build all the packages again on default github container and then register the flows which is a long process.
Anna Geller
05/04/2022, 9:28 PMRaviraja Ganta
05/04/2022, 9:31 PMAnna Geller
05/05/2022, 10:48 AM