Saoirse Amarteifio
06/20/2020, 9:07 PMVikram Iyer
06/29/2020, 5:09 PM1. flow.register() # this will register the flow and hence I can see it on the UI
2. From the UI, I can go to the particular flow and run it. # easy
How do I do something similar, with the only exception of the service and code residing inside a docker container?
End Notes:-
I might be looking at it from a completely wrong perspective, please correct my understanding if necessary.
Cheers!
VikramRob Harrop
06/30/2020, 11:11 AMTraceback (most recent call last):
File "flow.py", line 38, in <module>
main()
File "flow.py", line 36, in main
flow.register(**register_params)
File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1479, in register
registered_flow = client.register(
File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/client/client.py", line 687, in register
res = self.graphql(
File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/client/client.py", line 226, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': '[{\'extensions\': {\'path\': \'$.variableValues\', \'code\': \'validation-failed\'}, \'message\': "no such type exists in the schema: \'flow_group_insert_input\'"}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
bruno.corucho
07/01/2020, 2:30 PM1. prefect backend server #switch to local server
2. prefect server start
3. python register_flow.py (imports * + flow.storage = Docker(...) `flow.register(name)) #in the future would like to set up registry_url = my_aws_ECR
4. prefect agent start docker #Start a docker agent
Currently following this documentation.Mac Gréco Péralte Chéry
07/03/2020, 8:00 PMUnexpected error while running flow: AttributeError("'Flow' object has no attribute 'slugs'",)
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 240, in run
parameters=parameters,
File "/home/ubuntu/.local/lib/python3.6/site-packages/prefect/engine/cloud/flow_runner.py", line 277, in initialize_run
tasks = {slug: t for t, slug in self.flow.slugs.items()}
AttributeError: 'Flow' object has no attribute 'slugs'
Mac Gréco Péralte Chéry
07/04/2020, 1:38 AMJulian
07/07/2020, 7:45 AMAustin Sharp
07/07/2020, 9:31 PMResult Handler check: OK
Flow: <http://localhost:8080/flow/724c4570-3a00-4362-9df5-27ea72975538>
However, the flow doesn't appear on the UI I already have up. If I follow the link, it takes me to a UI but I don't see the flow.
I think the issue is that I am using a linux machine that I am SSH'd into. Does anyone have any tips? I have tried changing some of my user config settings in .prefect/backend.toml. I suspect I need to change some of the 'localhost' to '0.0.0.0' but I'm not sure which ones and when I try doing them all the server throws errors when I start the server.
Any ideas?bruno.corucho
07/08/2020, 9:57 AMstrdata.storage = Docker(registry_url=os.environ["REGISTRY_URL"], #I want the flow name appended in here
dockerfile="deploy/Dockerfile",
image_tag=os.environ["CI_COMMIT_TAG"],
image_name=strdata.name)
# registers the flow to the server/pro d
strdata.register(project_name=os.environ["PROJECT_NAME"]) #but the log tells me there is no repository name = project_name in our registry_url
Robin
07/12/2020, 7:05 PMDeveloper
and Team
only allow 1 concurrent flow to run?
Is that also the case when self-managing prefect (local API)?
This would be a dealbreaker for us, I guess.
Cheers
RobinRobin
07/12/2020, 10:56 PMDaskKubernetesEnvironment
up and running, but it does not look very parallelized 🤔
Here is the code:
import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment, LocalEnvironment
import time
import requests
# from prefect.tasks import kubernetes
from prefect.engine.executors import DaskExecutor
@task
def hello_task():
time.sleep(10)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Kubernetes!")
@task
def get_value():
time.sleep(10)
return 5
@task
def output_value(value):
print(value)
@task
def fetch_google():
r = requests.get("<http://www.google.com>")
print(r.content)
with Flow("hello-k8s", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=5, labels=["yogasev"])) as flow:
value = get_value()
output_value(value)
fetch_google()
hello_task()
hello_task()
hello_task()
hello_task()
hello_task()
flow.storage = Docker(registry_url="<my-repo>")
flow.register(project_name="eks_test_01")
Did we miss anything?
On kubectl it looked parallelized (5 dask pods created in parallel), but prefects visualization suggests otherwise… 🤷♂️Severin Ryberg [sevberg]
07/13/2020, 1:10 AMBivu Raj
07/14/2020, 6:10 PMEdCode
07/16/2020, 5:06 PMBivu Raj
07/16/2020, 6:21 PMMac Gréco Péralte Chéry
07/16/2020, 8:09 PMdef notify_on_success(flow: Flow, old_state: State, new_state: State)->State:
if(new_state.is_successful()):
getSmsTextTask=flow.get_tasks(name="Retrieve SMS Text")[0]
smsMessageLocation=new_state.result[getSmsTextTask]._result.location
smsMessages3Result=flow.result.read(smsMessageLocation)
smsMessage=smsMessages3Result.value
scheduledSmsIdTask = flow.get_tasks(name="scheduleSmsId")[0]
scheduleSmsId = new_state.result[scheduledSmsIdTask]._result.value
link="<https://myurl/>"+scheduleSmsId
mailText=build_sms_success_email_notification_text(sms_text=smsMessage,call_to_action_link=link)
send_email(["myemail"],message=mailText,subject="SMS SENDING COMPLETE")
return
(I have to mention that scheduleSmsId is a Parameter)
This result handler works fine in my local machine but once i test it on my EC2 instance i get this error:
Exception raised while calling state handlers: KeyError(<Task: Retrieve SMS Text>,)
Greg Desmarais
07/17/2020, 1:56 PMprefect server start
. Any thoughts on that, @prefect-people?
Now, customers can use Docker Desktop and Docker Compose to deploy containers on Amazon Elastic Container Service (ECS) using the AWS Fargate launch type. After customers build and test containers locally using Docker Desktop and Docker Compose, they can now deploy them to Fargate on ECS through the same CLI.
Greg Desmarais
07/17/2020, 9:50 PMGreg Desmarais
07/18/2020, 3:24 AMprefect.server.cli.server
, there is a click option for start, but none for stop. Do I have to docker kill
all the processes?Greg Desmarais
07/18/2020, 3:44 PMhasura_1 | {"type":"http-log","timestamp":"2020-07-18T15:43:04.599+0000","level":"info","detail":{"operation":{"query_execution_time":3.814376e-3,"user_vars":{"x-hasura-role":"admin"},"request_id":"c6784bfe-20f8-4863-99da-1fd1a69c9eb2","response_size":98,"request_read_time":4.539e-6},"http_info":{"status":200,"http_version":"HTTP/1.1","url":"/v1alpha1/graphql","ip":"172.29.0.6","method":"POST","content_encoding":"gzip"}}}
I have my log level at the default (info) im my config.toml, and these seem to be debug level. I could raise the log level higher, but I want to see info level. These messages appear over and over and pollute my logs...Greg Desmarais
07/20/2020, 5:59 AMGreg Desmarais
07/20/2020, 6:19 AMCJ
07/20/2020, 2:08 PMprefect agent start --volume /path/to/local/.config.toml:/root/.prefect/config.toml docker
. Then, when I registered and ran my flow, it would have access to variables I had set in my local .config.toml
, especially secrets. After upgrading, I noticed this was no longer the case. I would start the agent in the same way, but context.secrets
would be empty, and the values returned from prefect config
look like the factory defaults (eg use_local_secrets=false
).CJ
07/20/2020, 2:09 PMCJ
07/20/2020, 2:10 PMCJ
07/20/2020, 2:33 PM--volume
argument prefect finds my config file.Greg Desmarais
07/20/2020, 3:40 PMCompleted flow run submission
. No Fargate cluster is ever created. What needs to happen on the agent (or prefect server?) to see that the next steps are takan (e.g. cluster created, work started)?Greg Desmarais
07/21/2020, 2:08 PMprefecthq/prefect:all_extras
is running python 3.8 - this seems to require all clients to move to 3.8, as cloudpickle might be sensitive to the version difference. Am I right on that? Is the position of the prefect team that everything needs to be on py 3.8? I might not have rtfm enough...Burkhard Hoeckendorf
07/21/2020, 8:13 PMGreg Desmarais
07/23/2020, 3:31 PMTypeError: can not serialize 'Variable' object
Greg Desmarais
07/23/2020, 3:31 PMTypeError: can not serialize 'Variable' object
==> prefect_logs/dask_worker.log <==
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 25, in dumps
small_header, small_payload = dumps_msgpack(msg)
File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 163, in dumps_msgpack
payload = msgpack.dumps(msg, use_bin_type=True)
File "/opt/conda/lib/python3.7/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 283, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Variable' object
distributed.comm.utils - INFO - Unserializable Message: {'op': 'variable_get', 'timeout': 0, 'name': <distributed.variable.Variable object at 0x7fec9eafaef0>, 'client': 'Client-worker-365168fe-ccf9-11ea-800b-0242ac110004', 'reply': True}
distributed.comm.utils - ERROR - can not serialize 'Variable' object
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 35, in _to_frames
msg, serializers=serializers, on_error=on_error, context=context
File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 25, in dumps
small_header, small_payload = dumps_msgpack(msg)
File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 163, in dumps_msgpack
payload = msgpack.dumps(msg, use_bin_type=True)
File "/opt/conda/lib/python3.7/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 283, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Variable' object
Dylan
07/23/2020, 5:12 PMLuke
07/23/2020, 7:35 PMUnserializable Message: {'op': 'variable_get', 'timeout': 0, 'name': <distributed.variable.Variable object at 0x7fec9eafaef0>, 'client': 'Client-worker-365168fe-ccf9-11ea-800b-0242ac110004', 'reply': True}
that part reveals that you try to send not serializable object distributed.variable.Variable as a jsonGreg Desmarais
07/23/2020, 7:42 PM@task
def say_hello():
return f'done'
with Flow("Dask ECS Test 2") as flow:
say_hello()
bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
flow.storage = S3(bucket, key=key)
labels = ['size-small']
executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786')
flow.environment = LocalEnvironment(executor=executor, labels=labels)
flow_id = flow.register(labels=labels)
p_client = Client()
ret = p_client.create_flow_run(flow_id=flow_id)
print(f'Created flow run: {ret}')
Dylan
07/23/2020, 7:51 PMGreg Desmarais
07/23/2020, 7:55 PMDylan
07/23/2020, 7:56 PMGreg Desmarais
07/23/2020, 7:58 PMDylan
07/23/2020, 8:38 PM