Troy Sankey
04/29/2020, 5:45 PMdefault
one instead). Also, more fundamentally, the prefect code is hard-coded to use the following string for the container args: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/k8s/job.py#L276
"python -c 'import prefect; prefect.Flow.load(prefect.context.flow_file_path).environment.run_flow()'"
but actually the job spec ends up using a different string!
$ kubectl --namespace=prefect get job prefect-job-e4b101a5 -o yaml | grep -A1 args
- args:
- prefect execute cloud-flow
I checked the version of prefect on my laptop (which i use to deploy the prefect flow) and the prefect agent image deployed to my cluster. It's all 0.10.x.josh
04/29/2020, 5:53 PMprefect-job-xx
that you’re looking at is the initial prefect job that the agent creates and not the job created by the K8sJobEnvironment.
When the Agent finds a flow run it creates a job using the image you have set as your Flow’s Docker
storage. That job acts as a sort of init container where your Flow is loaded, inspected, and executed. In this case it is looking at your flow’s K8sJobEnvironment and creating a new job to execute your flow on. Then the original prefect job completes and the new job that was created runs the flow.prefect execute cloud-flow
to execute the flow’s environment and the second running the environment’s run_flow()
function to actually run the flowTroy Sankey
04/29/2020, 6:21 PMjosh
04/29/2020, 6:24 PMTroy Sankey
04/29/2020, 6:25 PMjosh
04/29/2020, 6:25 PMflow.environment
?Troy Sankey
04/29/2020, 6:26 PMjosh
04/29/2020, 6:27 PMTroy Sankey
04/29/2020, 6:30 PMwith Flow("Test credentials", environment=environment, storage=storage) as flow:
sf_credentials = VaultKVSecret(path="snowflake_pipeline_etl_loader", version=1,)
test_connection(sf_credentials)
flow.environment = RemoteEnvironment(labels=["prod"])
flow.register(project_name="tsankey test project")
josh
04/29/2020, 6:31 PMTroy Sankey
04/29/2020, 6:31 PMprefect:prefect
)josh
04/29/2020, 6:51 PMTroy Sankey
04/29/2020, 7:13 PM