Azuma
05/20/2020, 7:01 AMSandeep Aggarwal
05/20/2020, 1:47 PMTypeError: Object of type 'Parameter' is not JSON serializable
Below is a sample snippet:
with Flow("sample flow") as sample_flow:
param = Parameter("task_param")
FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
Questionnaire
05/20/2020, 2:11 PMdocker-compose
and docker
files added these lines:
flow.run_agent()
c = Client()
c.create_flow_run()
but facing this error:Questionnaire
05/20/2020, 2:11 PMTraceback (most recent call last):
File "main.py", line 315, in <module>
flow.register()
File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
no_url=no_url,
File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 619, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1279, in serialize
self.storage.add_flow(self)
File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/environments/storage/local.py", line 100, in add_flow
flow_location = flow.save(flow_location)
File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1346, in save
cloudpickle.dump(self, f)
File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 1067, in dump
CloudPickler(file, protocol=protocol).dump(obj)
File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 477, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 876, in save_set
save(item)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 544, in save_function
return self.save_function_tuple(obj)
File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 756, in save_function_tuple
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.lock objects
Darragh
05/20/2020, 3:43 PM[2020-05-18 17:52:01,285] ERROR - agent | HTTPConnectionPool(host=‘localhost’, port=4200): Max retries exceeded with url: /graphql/alpha (Caused by NewConnectionError(‘<urllib3.connection.HTTPConnection object at 0x7f52470ae0f0>: Failed to establish a new connection: [Errno 111] Connection refused’,))
Thanks in advance 🙂Mark McDonald
05/20/2020, 5:20 PMcreate_cloud_hook = f"""
mutation {{
create_cloud_hook(input: {{
type: WEBHOOK,
name: "{cloud_hook_name}",
version_group_id: "{version_group_id}",
states: ["Cached", "Cancelled", "Failed", "Finished", "Looped", "Mapped", "Paused", "Pending", "Queued", "Resume", "Retrying", "Running", "Scheduled", "Skipped", "Submitted", "Success", "Timedout", "Triggerfailed"],
config: {{
url: "{web_hook_url}"
}}
}}) {{
id
}}
}}
"""
something like this:
create_cloud_hook = f"""
mutation {{
create_cloud_hook(input: {{
type: WEBHOOK,
name: "{cloud_hook_name}",
version_group_id: "{version_group_id}",
states: ["All"],
config: {{
url: "{web_hook_url}"
}}
}}) {{
id
}}
}}
"""
tkanas
05/20/2020, 5:37 PMjames.lamb
05/20/2020, 8:54 PMDaskExecutor
to speed it up. It's clear from the documentation how to do this if you already have a Dask cluster up and running and just want to use it as an executor for a flow run.
For my use case, I'd like to start the Dask cluster at the beginning of a flow run and stop it at the end of a flow run. Right now I'm running this flow in a standalone Python process (just a script with flow code that ends in flow.run()
), not using an agent talking to Prefect Cloud.
What is the recommended way to get the behavior I want, where the Dask cluster gets started when the flow run starts and stopped when it stops, without using Prefect Cloud?
• Somehow use Dask Cloud Provider Environment or Dask Kubernetes Environment without Prefect Cloud
• Extend DaskExecutor by overriding its setup and teardown to start / stop the cluster
• something else that I'm missing
• it's not possible, use Prefect Cloud
Thanks very much!Marwan Sarieddine
05/20/2020, 9:37 PMDaskKubernetesEnvironment
and setting imagePullSecrets to pull images from our private gitlab container registry …
Note I am using a Kubernetes Agent polling from prefect cloud and using prefect version 0.11.2.
A couple of questions:
1. I thought if I created the docker-registry secret using kubernetes, added imagePullSecrets to the podSpec and specified a custom scheduler_file and worker_spec file to DaskKubernetesEnvironment - this approach doesn’t rely on prefect secrets and I thought should work fine - but I am getting an empty dict for imagePullSecrets when inspecting the job and pod specs … I thought the kubernetes agent might be overwriting them (I saw the replace_job_spec_yaml
method ) and so I also specified the secret name in the kubernetes agent manifest under the environment variable (IMAGE_PULL_SECRETS) — but still I am getting an empty dict for imagePullSecrets - any idea why ?
2. The other approach seems to be not to specify the worker_spec and scheduler file but to set private_registry to True and docker_secret to the name of the Prefect Secret, then I use the client to create the prefect secret by setting a name and value - I don’t see where in the code is the prefect secret’s value taken and a kubernetes secret is created ? (so I tried both using a dictionary of docker-server, docker-username, docker-password, and docker-email or just setting the value to the name of the k8s secret that I created) both approaches didnt work - and I am getting an empty dict for imagePullSecrets …
Any idea what might be going on here ? and what is the best practice to setting k8s imagePullSecrets for DaskKubernetesEnvironment
?Slackbot
05/20/2020, 11:22 PMitay livni
05/21/2020, 1:38 AMAvi A
05/21/2020, 7:34 AMretry_delay
that increases in each retry? (e.g. implement an exponential backoff)Arsenii
05/21/2020, 9:58 AMbase_image
of docker.APIClient
(https://github.com/PrefectHQ/prefect/blob/1447982cc63a1f87c4a19cc7be18f2b2693fb883/src/prefect/environments/storage/docker.py#L508). Any suggestions on how to do this? Thanks!Questionnaire
05/21/2020, 10:41 AMconfig.toml
and UI screenshot. Please help.
and getting an error.Avi A
05/21/2020, 12:17 PMtask
decorator) and treat them as tasks only inside the flow. i.e.:
def f():
pass
# regular call
foo = f()
# inside a flow
with Flow() as flow:
foo = task(f)()
any nicer suggestions?Avi A
05/21/2020, 12:44 PMMatias Godoy
05/21/2020, 1:57 PMifelse
until now, and it was really hard to read and to explain to my coworkers. Now with case
everything is a lot mor readable.
Thanks a lot for your excellent work and keep it up! 💪Geoffrey Gross
05/21/2020, 6:27 PMAlex Welch
05/21/2020, 6:56 PMAlex Welch
05/21/2020, 6:56 PMBen Fogelson
05/21/2020, 7:38 PMFlow
run fails. Is there a recommended way to store the state of the failed run so that I could load it up, raise the exception stored in the failed task’s result, and step into a debugger?Manuel Mourato
05/21/2020, 7:43 PMstorage = Docker(image_name="flows-storage", image_tag="v1.0",local_image=True,dockerfile="DockerFile-PrefectDependencies")
storage.build()
test_flow.storage=storage
test_flow.register(build=False,labels=['docker-execs-test'])
But when I start a docker agent and run the flow, the run is permanently in Submitted state.
Any idea why this is?Shreyansh Pandey
05/21/2020, 8:33 PMShreyansh Pandey
05/21/2020, 8:45 PMpython3 flow.py
(where flow.py
is the entry-point);
• this, in turn, calls flow.register()
which registers/updates the flow on upstream.
If that's that case, what would be the best way to accomplish the dynamic configuration of the Prefect API endpoint? (Env var, etc.) I understand that I can use PREFECT__CONTEXT__SECRET__<NAME>
to dynamically serialize secrets.Noah Nethery
05/21/2020, 9:27 PMDocker registry secret default-docker does not exist for this tenant.
Creating Docker registry kubernetes secret from "prefect" Prefect Secret.
Failed to load and execute Flow's environment: ClientError([{'message': 'No value found for the requested key', 'locations': [{'line': 2, 'column': 9}], 'path': ['secret_value'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': 'No value found for the requested key', 'locations': [], 'path': ['secret_value']}]}}}])
Here is my flow:
from prefect import task, Flow, Parameter
from prefect import Client
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment
import json
@task(log_stdout=True)
def say_hello():
print("Hello, K8s!".format(name))
return True
@task
def print_completed(status):
print('Success')
flow = Flow("Simple-Flow",
environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3, private_registry=True, docker_secret='prefect'),
storage=Docker(image_name="hello-prefect", registry_url='<http://my-company-reg.com|my-company-reg.com>'))
print_completed.set_upstream(say_hello, flow=flow)
print_completed.bind(status=say_hello, flow=flow)
flow.register(project_name="Hello K8s")
And here is the description of my agent’s pod:
Name: prefect-agent-7fd7ff499-nzvlr
Namespace: default
Priority: 0
PriorityClassName: <none>
Node: **********
Start Time: Thu, 21 May 2020 15:51:33 -0400
Labels: app=prefect-agent
pod-template-hash=7fd7ff499
Annotations: <http://kubernetes.io/psp|kubernetes.io/psp>: eks.privileged
Status: Running
IP: 100.64.16.213
Controlled By: ReplicaSet/prefect-agent-7fd7ff499
Containers:
agent:
Container ID: <docker://d6a7fd06ea04adbf956d7a4f9aefdaf421929f1bbaf2b6af4e90a9262095fe4>f
Image: prefecthq/prefect:0.11.2-python3.6
Image ID: <docker-pullable://prefecthq/prefect@sha256:d0f685016f5f82a373a0b3aeadb4598529e7f31139ca9f585b2077e1f6097c64>
Port: <none>
Host Port: <none>
Command:
/bin/bash
-c
Args:
prefect agent start kubernetes
State: Running
Started: Thu, 21 May 2020 15:51:34 -0400
Ready: True
Restart Count: 0
Limits:
cpu: 100m
memory: 128Mi
Requests:
cpu: 100m
memory: 128Mi
Liveness: http-get http://:8080/api/health delay=40s timeout=1s period=40s #success=1 #failure=2
Environment:
PREFECT__CLOUD__AGENT__AUTH_TOKEN: ********
PREFECT__CLOUD__API: <https://api.prefect.io>
NAMESPACE: default
IMAGE_PULL_SECRETS: prefect
PREFECT__CLOUD__AGENT__LABELS: []
JOB_MEM_REQUEST:
JOB_MEM_LIMIT:
JOB_CPU_REQUEST:
JOB_CPU_LIMIT:
PREFECT__BACKEND: cloud
PREFECT__CLOUD__AGENT__AGENT_ADDRESS: http://:8080
Mounts:
/var/run/secrets/kubernetes.io/serviceaccount from default-token-5nvmj (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
default-token-5nvmj:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-5nvmj
Optional: false
QoS Class: Guaranteed
Node-Selectors: <none>
Tolerations: <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> for 300s
<http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> for 300s
Events: <none>
I’ve verified that the secret named prefect
pulls down my image from our company’s registry.Marwan Sarieddine
05/21/2020, 10:56 PMMarked "Failed" by a Zombie Killer process.
- basically I created a simple flow and the flow runs fine when the size of the input data is small, but when I increase the size of the input data (but remain well below the memory limit of the dask worker) - parts of the flow run, but some parts get marked as “failed” by a zombie killer process -
Also the entire flow’s status gets stuck at “running” even though the task has been marked as “failed”
I am sharing my flow’s code and dask-worker spec under this issue here (https://github.com/PrefectHQ/prefect/issues/1954) - to give a concrete example of the failureManuel Mourato
05/22/2020, 1:51 PMprefect agent start docker -l docker-execs-test
If I try to set the network to host however, like bellow, the flow is permanently in "Submitted" state.
prefect agent start docker -l docker-execs-test --network host
Any ideas on why this might be?Darragh
05/22/2020, 1:56 PMAndor Tóth
05/22/2020, 2:11 PMAndor Tóth
05/22/2020, 2:11 PMAndor Tóth
05/22/2020, 2:11 PM