Saranya Elumalai
09/01/2020, 2:00 PMstart = Parameter("start")
start.set_downstream(s3_connection, flow=flow)
s3_connection.set_downstream(lin_of_bus, flow=flow)
s3_connection.set_downstream(company, flow=flow)
....
end_task = final_task()
end_task.set_upstream(lin_of_bus,flow=flow)
end_task.set_upstream(company, flow=flow)
flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
Below schematic diagram shows the tasks are executed sequentially and not parallelJim Crist-Harif
09/01/2020, 4:27 PMSaranya Elumalai
09/01/2020, 7:06 PMJim Crist-Harif
09/01/2020, 10:43 PMcluster_kwargs={"silence_logs": 10}
passed to DaskExecutor
and report back with the logs (you may want to glance through them to remove anything you'd consider private).Saranya Elumalai
09/02/2020, 3:15 AMflow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
but still runs sequential
Screen Shot 2020-09-01 at 9.07.59 PMJim Crist-Harif
09/02/2020, 3:15 PMSaranya Elumalai
09/02/2020, 3:32 PMJim Crist-Harif
09/02/2020, 3:41 PMflow.run
(you can tell, since the logs have prefect.CloudFlowRunner
in them). I assume you've registered the flow elsewhere. To configure the executor to use with a cloud/server flow run, you need to specify the executor as part of the environment. Try doing
flow.environment.executor = DaskExecutor()
before registering your flow.Saranya Elumalai
09/02/2020, 3:50 PMflow.environment.executor = DaskExecutor()
as env , will it make all flow to run parallel?
2. I have this line in my code. Does cloudflowrunner overrides flow.run ? Is there a way to make flow.run take preference?
flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
Jim Crist-Harif
09/02/2020, 3:53 PMflow.run
is for running locally, the logs you've posted are from a cloud run. Generally you have two options when running prefect flows:
1. Write a script that builds a flow, then calls flow.run(...)
at the end. Run the script as python yourflow.py
. This will use the configuration in flow.run
2. Write a script that builds a flow, configures the flow's environment, then calls flow.register
at the end. Run the script as python yourflow.py
this will register your flow. You can then schedule cloud/server flow runs via cloud/server.
See https://docs.prefect.io/orchestration/tutorial/first.html for more information.will it make all flow to run parallel?for this flow, yes. Note that environment configuration only affects cloud/server flow runs, while passing
executor
to flow.run
only affects flow.run
execution.Saranya Elumalai
09/02/2020, 4:02 PMJim Crist-Harif
09/02/2020, 4:06 PMflow.environment.executor
will only affects that flow.Saranya Elumalai
09/02/2020, 4:54 PMJim Crist-Harif
09/02/2020, 4:56 PMflow.environment.executor
is lost. I see you re-add it on line 66, but you do that after the call to register
, you need to do that before the call to register
. You can then remove setting the executor in app.py
.Saranya Elumalai
09/02/2020, 5:44 PMJim Crist-Harif
09/02/2020, 6:02 PMbuild=False
you're not rebuilding your flow, so the environment config isn't actually stored in your docker image. You'll want to:
• drop storage.build()
• drop the line setting build=False
Alternatively, move the line where you're configuring your executor to after line 42 so it happens before you call storage.build
Saranya Elumalai
09/02/2020, 8:06 PMJim Crist-Harif
09/02/2020, 8:16 PMfrom prefect.environments.storage import Docker
from prefect.environments import KubernetesJobEnvironment
import sys
import os
import yaml
from prefect.engine.executors import DaskExecutor
def build_prefect_flow_image():
"""
Builds the prefect flow by adding the kubernetes job environment that is specified in the values
folder for the s3 service account helm templates. It then stores the flow in a docker image.
It then registers the flow with prefect cloud.
:return:
"""
IMAGE_NAME = os.environ["IMAGE_NAME"]
ECR_BASE_URL = os.environ["ECR_BASE_URL"]
IMAGE_TAG = os.environ["TAG"]
PREFECT_LABELS = os.getenv("PREFECT_LABELS", "")
KUBERNETES_FILE = os.path.join("values", os.environ["K8S_FILE_NAME"])
# Set the path to be the folder structure that holds the "app" and "values" folder
sys.path.append(os.getcwd())
# Set the environment variables to what is in the values folder If we don't do this prefect is an unhappy technology
# I'm not sure this is the best way to do it, so we may want to consider if we should do it a different way
with open("values/stg.yaml") as file:
stg = yaml.load(file, Loader=yaml.FullLoader)
if 'envVars' in stg:
for var in stg['envVars']:
os.environ[var] = stg['envVars'][var]
# Assumes you're running in a directory with an app.py file that exports a flow
# Import the flow
from <http://app.app|app.app> import flow
# We want to set prefect to create a job with a kubernetes environment that we have set up. This means that the
# prefect job that is usually created will create another job with the given kubernetes file
# The kubernetes job file is created in the CICD pipeline. It holds the environment variables, service account,
# docker image, flow, etc
flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
flow.environment.labels.update(flow.storage.labels)
flow.environment.labels.update(PREFECT_LABELS.split(","))
# Our prefect flow and kubernetes info will be stored in a docker image that we host in AWS
flow.storage = Docker(
registry_url=ECR_BASE_URL,
dockerfile="Dockerfile",
image_name=IMAGE_NAME,
image_tag=IMAGE_TAG,
env_vars=dict(JOB_NAME=os.environ["JOB_NAME"], ENV=os.environ["ENV"]),
)
flow.register(project_name="fp-aws")
if __name__ == "__main__":
build_prefect_flow_image()
Saranya Elumalai
09/02/2020, 8:29 PMFile "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 35, in build_prefect_flow_image
250 flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
251TypeError: __init__() got an unexpected keyword argument 'executor'
252ERROR: Job failed: command terminated with exit code 1
KubernetesJobEnvironment init has executor argument. Why am I getting that error ?Jim Crist-Harif
09/02/2020, 8:32 PM$ prefect version
Saranya Elumalai
09/02/2020, 8:37 PMJim Crist-Harif
09/02/2020, 8:41 PMSaranya Elumalai
09/02/2020, 8:43 PMFile "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 70, in <module>
build_prefect_flow_image()
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 66, in build_prefect_flow_image
flow.register(project_name="fp-aws")
Jim Crist-Harif
09/08/2020, 2:24 PM