Saranya Elumalai
09/01/2020, 2:00 PMstart = Parameter("start")
start.set_downstream(s3_connection, flow=flow)
s3_connection.set_downstream(lin_of_bus, flow=flow)
s3_connection.set_downstream(company, flow=flow)
....
end_task = final_task()
end_task.set_upstream(lin_of_bus,flow=flow)
end_task.set_upstream(company, flow=flow)
flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
Below schematic diagram shows the tasks are executed sequentially and not parallelJim Crist-Harif
09/01/2020, 4:27 PMSaranya Elumalai
09/01/2020, 7:06 PMJim Crist-Harif
09/01/2020, 10:43 PMcluster_kwargs={"silence_logs": 10} passed to DaskExecutor and report back with the logs (you may want to glance through them to remove anything you'd consider private).Saranya Elumalai
09/02/2020, 3:15 AMflow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
but still runs sequential
Screen Shot 2020-09-01 at 9.07.59 PMSaranya Elumalai
09/02/2020, 2:11 PMJim Crist-Harif
09/02/2020, 3:15 PMSaranya Elumalai
09/02/2020, 3:32 PMJim Crist-Harif
09/02/2020, 3:41 PMflow.run (you can tell, since the logs have prefect.CloudFlowRunner in them). I assume you've registered the flow elsewhere. To configure the executor to use with a cloud/server flow run, you need to specify the executor as part of the environment. Try doing
flow.environment.executor = DaskExecutor()
before registering your flow.Saranya Elumalai
09/02/2020, 3:50 PMflow.environment.executor = DaskExecutor()
as env , will it make all flow to run parallel?
2. I have this line in my code. Does cloudflowrunner overrides flow.run ? Is there a way to make flow.run take preference?
flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))Jim Crist-Harif
09/02/2020, 3:53 PMflow.run is for running locally, the logs you've posted are from a cloud run. Generally you have two options when running prefect flows:
1. Write a script that builds a flow, then calls flow.run(...) at the end. Run the script as python yourflow.py. This will use the configuration in flow.run
2. Write a script that builds a flow, configures the flow's environment, then calls flow.register at the end. Run the script as python yourflow.py this will register your flow. You can then schedule cloud/server flow runs via cloud/server.
See https://docs.prefect.io/orchestration/tutorial/first.html for more information.Jim Crist-Harif
09/02/2020, 3:54 PMwill it make all flow to run parallel?for this flow, yes. Note that environment configuration only affects cloud/server flow runs, while passing
executor to flow.run only affects flow.run execution.Saranya Elumalai
09/02/2020, 4:02 PMJim Crist-Harif
09/02/2020, 4:06 PMflow.environment.executor will only affects that flow.Saranya Elumalai
09/02/2020, 4:54 PMJim Crist-Harif
09/02/2020, 4:56 PMflow.environment.executor is lost. I see you re-add it on line 66, but you do that after the call to register, you need to do that before the call to register. You can then remove setting the executor in app.py.Saranya Elumalai
09/02/2020, 5:44 PMJim Crist-Harif
09/02/2020, 6:02 PMbuild=False you're not rebuilding your flow, so the environment config isn't actually stored in your docker image. You'll want to:
• drop storage.build()
• drop the line setting build=False
Alternatively, move the line where you're configuring your executor to after line 42 so it happens before you call storage.buildSaranya Elumalai
09/02/2020, 8:06 PMJim Crist-Harif
09/02/2020, 8:16 PMfrom prefect.environments.storage import Docker
from prefect.environments import KubernetesJobEnvironment
import sys
import os
import yaml
from prefect.engine.executors import DaskExecutor
def build_prefect_flow_image():
"""
Builds the prefect flow by adding the kubernetes job environment that is specified in the values
folder for the s3 service account helm templates. It then stores the flow in a docker image.
It then registers the flow with prefect cloud.
:return:
"""
IMAGE_NAME = os.environ["IMAGE_NAME"]
ECR_BASE_URL = os.environ["ECR_BASE_URL"]
IMAGE_TAG = os.environ["TAG"]
PREFECT_LABELS = os.getenv("PREFECT_LABELS", "")
KUBERNETES_FILE = os.path.join("values", os.environ["K8S_FILE_NAME"])
# Set the path to be the folder structure that holds the "app" and "values" folder
sys.path.append(os.getcwd())
# Set the environment variables to what is in the values folder If we don't do this prefect is an unhappy technology
# I'm not sure this is the best way to do it, so we may want to consider if we should do it a different way
with open("values/stg.yaml") as file:
stg = yaml.load(file, Loader=yaml.FullLoader)
if 'envVars' in stg:
for var in stg['envVars']:
os.environ[var] = stg['envVars'][var]
# Assumes you're running in a directory with an app.py file that exports a flow
# Import the flow
from <http://app.app|app.app> import flow
# We want to set prefect to create a job with a kubernetes environment that we have set up. This means that the
# prefect job that is usually created will create another job with the given kubernetes file
# The kubernetes job file is created in the CICD pipeline. It holds the environment variables, service account,
# docker image, flow, etc
flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
flow.environment.labels.update(flow.storage.labels)
flow.environment.labels.update(PREFECT_LABELS.split(","))
# Our prefect flow and kubernetes info will be stored in a docker image that we host in AWS
flow.storage = Docker(
registry_url=ECR_BASE_URL,
dockerfile="Dockerfile",
image_name=IMAGE_NAME,
image_tag=IMAGE_TAG,
env_vars=dict(JOB_NAME=os.environ["JOB_NAME"], ENV=os.environ["ENV"]),
)
flow.register(project_name="fp-aws")
if __name__ == "__main__":
build_prefect_flow_image()Saranya Elumalai
09/02/2020, 8:29 PMFile "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 35, in build_prefect_flow_image
250 flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
251TypeError: __init__() got an unexpected keyword argument 'executor'
252ERROR: Job failed: command terminated with exit code 1
KubernetesJobEnvironment init has executor argument. Why am I getting that error ?Jim Crist-Harif
09/02/2020, 8:32 PMJim Crist-Harif
09/02/2020, 8:33 PM$ prefect versionSaranya Elumalai
09/02/2020, 8:37 PMJim Crist-Harif
09/02/2020, 8:41 PMSaranya Elumalai
09/02/2020, 8:43 PMSaranya Elumalai
09/02/2020, 10:21 PMFile "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 70, in <module>
build_prefect_flow_image()
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 66, in build_prefect_flow_image
flow.register(project_name="fp-aws")Saranya Elumalai
09/03/2020, 3:55 PMJim Crist-Harif
09/08/2020, 2:24 PM