Adam
08/05/2020, 1:23 PMStep 10/10 : RUN python /opt/prefect/healthcheck.py '["/opt/prefect/flows/customers.prefect", "/opt/prefect/flows/deleted-customer-nodes.prefect"]' '(3, 8)'
---> Running in d022cfb5badd
Beginning health checks...
System Version check: OK
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 135, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 39, in cloudpickle_deserialization_check
flows.append(cloudpickle.load(f))
ModuleNotFoundError: No module named 'deleted_customers'
josh
08/05/2020, 1:24 PMdeleted_customers
a local module that you have?Adam
08/05/2020, 1:25 PMproject/
- customers/
- flow.py
- deleted_customers/
- flow.py
- build.py
The build.py
script then imports it as from deleted_customers.flow import flow as deleted_customers
customers
module it works fine, its only when I add the deleted_customers
module that I get an issue. Could it be that it doesn’t pickle?josh
08/05/2020, 1:33 PMbuild.py
? Also when you say it doesn’t work when adding the deleted_customers
is that when it’s being imported in build.py
or customers/flow.py
?Adam
08/05/2020, 1:39 PMbuild.py
import uuid
from os import environ, path
import docker
from prefect.environments.storage import Docker
from customers.flow import flow as customers
from deleted_customers.flow import flow as deleted_customers
# This is where we add other flows we've imported. They will all be bundled and deployed together
FLOWS = [customers, deleted_customers]
registry_url = "<http://gcr.io/our-company-3bbf0/company_jobs|gcr.io/our-company-3bbf0/company_jobs>"
image_tag = uuid.uuid4().hex
tls_config = None
base_url = None
# Special Docker-in-Docker configuration for CircleCI
if environ.get("CI"):
print("Running on CI")
tls_config = docker.tls.TLSConfig(
client_cert=(
path.join(environ.get("DOCKER_CERT_PATH", ""), "cert.pem"),
path.join(environ.get("DOCKER_CERT_PATH", ""), "key.pem"),
),
verify=False,
)
base_url = environ.get("DOCKER_HOST")
# Configure the storage object
storage = Docker(
image_name="company_image",
registry_url=registry_url,
image_tag=image_tag,
base_url=base_url, # required for CircleCI
tls_config=tls_config, # required for CircleCI
python_dependencies=[
"pandas",
"prefect[google,kubernetes]",
"requests",
"synapsepy",
],
)
# Add the flows to the Docker stroage
for workflow in FLOWS:
storage.add_flow(workflow)
# Build the Docker image
storage_ref = storage.build()
# Assign the flow storage to Docker and register
for workflow in FLOWS:
workflow.storage = storage_ref
workflow.register(project_name="prefect-test-1", build=False)
josh
08/05/2020, 1:42 PMAdam
08/05/2020, 1:54 PMjosh
08/05/2020, 1:55 PMAdam
08/05/2020, 2:00 PMdeleted_customers
. I don’t really understand the docs on thatjosh
08/05/2020, 2:03 PMcloudpickle.dump(flow)
then a cloudpickle.load(…)
Adam
08/05/2020, 2:09 PMimport cloudpickle
from sable_batch.customers import flow as customers_flow
from sable_batch.deleted_customers import flow as deleted_customers_flow
def test_deleted_customers_flow():
pickled_flow = cloudpickle.dumps(deleted_customers_flow)
unpickled_flow = cloudpickle.loads(deleted_customers_flow)
josh
08/05/2020, 2:10 PMAdam
08/05/2020, 2:11 PMjosh
08/05/2020, 2:11 PMAdam
08/05/2020, 2:12 PM