Troy Sankey04/29/2020, 5:45 PM
one instead). Also, more fundamentally, the prefect code is hard-coded to use the following string for the container args: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/k8s/job.py#L276
but actually the job spec ends up using a different string!
"python -c 'import prefect; prefect.Flow.load(prefect.context.flow_file_path).environment.run_flow()'"
I checked the version of prefect on my laptop (which i use to deploy the prefect flow) and the prefect agent image deployed to my cluster. It's all 0.10.x.
$ kubectl --namespace=prefect get job prefect-job-e4b101a5 -o yaml | grep -A1 args - args: - prefect execute cloud-flow
josh04/29/2020, 5:53 PM
that you’re looking at is the initial prefect job that the agent creates and not the job created by the K8sJobEnvironment. When the Agent finds a flow run it creates a job using the image you have set as your Flow’s
storage. That job acts as a sort of init container where your Flow is loaded, inspected, and executed. In this case it is looking at your flow’s K8sJobEnvironment and creating a new job to execute your flow on. Then the original prefect job completes and the new job that was created runs the flow.
to execute the flow’s environment and the second running the environment’s
prefect execute cloud-flow
function to actually run the flow
Troy Sankey04/29/2020, 6:21 PM
josh04/29/2020, 6:24 PM
Troy Sankey04/29/2020, 6:25 PM
josh04/29/2020, 6:25 PM
Troy Sankey04/29/2020, 6:26 PM
josh04/29/2020, 6:27 PM
Troy Sankey04/29/2020, 6:30 PM
with Flow("Test credentials", environment=environment, storage=storage) as flow: sf_credentials = VaultKVSecret(path="snowflake_pipeline_etl_loader", version=1,) test_connection(sf_credentials) flow.environment = RemoteEnvironment(labels=["prod"]) flow.register(project_name="tsankey test project")
josh04/29/2020, 6:31 PM
Troy Sankey04/29/2020, 6:31 PM
josh04/29/2020, 6:51 PM
Troy Sankey04/29/2020, 7:13 PM