Darragh
07/08/2020, 12:13 PMKevin Weiler
07/08/2020, 3:02 PMKevin Weiler
07/08/2020, 3:25 PMUnexpected error: AttributeError("'FunctionTask' object has no attribute 'result_handler'")
Traceback (most recent call last):
File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 447, in get_flow_run_state
executor=executor,
File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/executors/local.py", line 24, in submit
return fn(*args, **kwargs)
File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 569, in run_task
default_handler = task.result_handler or self.flow.result_handler
AttributeError: 'FunctionTask' object has no attribute 'result_handler'
Ankit
07/09/2020, 7:52 AMprefect ui+server
as well. My question is prefect server
requires docker and docker-compose
as mentioned in the docs. So I can't run it inside the docker container for prefect, right? How would I go about doing this then? Any help would be appreciated.
TIAFlorian L
07/09/2020, 9:09 AMSandeep Aggarwal
07/09/2020, 9:22 AMFlowRunTask
task?
Basically, I am sending some context parameter while creating a flow run using python client's create_flow_run
method. The flow upon completion triggers other downstream flows.
Now, I want the context that I originally sent, available to downstream flows but I haven't had any success so far.bruno.corucho
07/09/2020, 10:35 AMresponse body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:default:default\" cannot list resource \"services\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"kind":"services"},"code":403}
PS: We can see "sucessful" response bodies, but should we ignore this last entry / is it intended?
Thanks in advance.Darragh
07/09/2020, 12:27 PMFargateTaskEnvironment
. I have a self hosted Prefect 0.12.2 instance running on AWS, and am trying to run a flow against Fargate, but getting the following error that seems to only relate to Cloud hosted versions. Any ideas?
From fargate_task.py287
container_overrides = [
{
"name": "flow-container",
"environment": [
{
"name": "PREFECT__CLOUD__AUTH_TOKEN",
"value": config.cloud.agent.auth_token
or config.cloud.auth_token,
},
{"name": "PREFECT__CONTEXT__FLOW_RUN_ID", "value": flow_run_id},
{"name": "PREFECT__CONTEXT__IMAGE", "value": get_flow_image(flow)},
],
}
]
Amit Singh
07/09/2020, 2:26 PMwith Flow('Dummy Cron Flow', schedule=CronSchedule("*/1 * * * *")) as cron_flow:
Michael Ludwig
07/09/2020, 2:32 PMFargateTaskEnvironment
. I think we did all according to the docs but when passing things through to AWS when it tries to run the flow we get the following error (on prefect cloud):
An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.
This is our environment:
flow.environment = FargateTaskEnvironment(
executor=DaskExecutor(),
labels=[f"reco-{self._config.env}"],
launch_type="FARGATE",
region="eu-west-1",
taskRoleArn="arn:aws:iam::XXXXXXXX:role/prefect-task-role-manual",
executionRoleArn="arn:aws:iam::XXXXXX:role/prefect-execution-role-manual",
family="tst_flow",
cluster="reco-dev-tasks-ecscluster-ECSCluster-13542I9PRAV48",
networkConfiguration={
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"subnets": [
"subnet-05fd20a54e43646xx",
"subnet-04e8a87c3f718dcxx",
"subnet-0545ba61d9f1b5exx",
],
"securityGroups": ["sg-005d5df0d9d6594xx"],
}
},
cpu="512",
memory="3072",
containerDefinitions={
"name": "flow-container",
"image": "image",
"command": [],
"environment": [],
"essential": True,
"memoryReservation": "3072",
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/prefect-runs-manual",
"awslogs-region": "eu-west-1",
"awslogs-stream-prefix": "prefect-flow-runs",
"awslogs-create-group": "true",
},
},
}
)
We also use Docker-based storage. When putting this config to the FargateAgent
it can correctly run flows e.g. with a LocalEnvironment
but then the whole config is in the agent and the same for all flows.
The issue is somehow how the parameters are passed forward to AWS. Our code is very close to the example in the docs: https://docs.prefect.io/orchestration/execution/fargate_task_environment.html#examples
But we tried it with memory
and memoryReservation
in container definitions but same error in both cases. Also tried to pass containerDefinitions
as array.
Any help would be greatly appreciated. Thanks 🙂Dnyaneshwar
07/09/2020, 2:52 PM@task
def get_list_1():
l = list()
l.append((t1, t2))
l.append((t3, t4))
return l
@task
def function_1(arg1, arg2):
# do something
return arg2, False/True
# Other two tasks also have the similar structure.
with Flow('test_flow') as flow:
l_1 = get_list_1()
res_1 = function_1.map(l_1)
l_2 = get_list_2()
get_list_2.set_upstream(function_1)
res_2 = function_2.map(l_2)
I want to run this flow with DaskExecutor where the dask cluster has been created with YarnCluster API from dask.
The number of workers which will be assigned to scheduler will be limited
However, the lists l_1 and l_2 might run into hundreds of tuples.
To avoid any worker being killed because of memory or other issues, I want to pass the slices of l_1 (or l_2) to the functions instead of full list.
How many slices will depend on the the length of output lists.
Can anyone help on how to handle this? or may be there is better solution that passing the slices to the map. Thanks.Nicolas van de Walle
07/09/2020, 2:55 PMprefect agent install kubernetes -t <MY_TOKEN> -n prefect --label prefect-namespace --rbac | kubectl apply -n prefect -f -
I used Prefect cloud for that. The flow has well been scheduled but nothing gets executed by the agent (it appears in the agents tab in the cloud UI with its prefect-namespace label but it does not see the flows that need to be run).
To be honest, I am quite new to kubernetes and do not really understand how to use RBAC. Do I need to do anything else?Kevin Weiler
07/09/2020, 3:40 PMlog_stdout
on a more global level, say, the flow level - or even in the config.toml
?Mark McDonald
07/09/2020, 8:35 PMBernard Greyling
07/10/2020, 8:36 AMBernard Greyling
07/10/2020, 8:47 AMcpu: 100m & memory: 128Mi
is mentioned but not explained. What is the reasoning behind this limit?
2. We've successfully setup/authenticated a k8s runner and scheduled both s3 and docker runs.
After a custom image pull error on k8s the k8s-prefect agent seems to be in a feedback loop announcing that it can see flows : Found 2 flow run(s) to submit for execution.
But not executing them. Note - I did manually terminate the k8s job via kubectl. Not sure if this messed up the prefect-cloud state
EDIT - Before this feedback loop state, we managed to run both s3 and docker runs with the vanilla examplesRafal
07/10/2020, 11:21 AMRafal
07/10/2020, 11:21 AMTraceback (most recent call last):
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1252, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1298, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1247, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1026, in _send_output
self.send(msg)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 966, in send
self.connect()
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 725, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/retry.py", line 439, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/users/chojnar1/anaconda3/envs/prefect/bin/prefect", line 10, in <module>
sys.exit(cli())
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/cli/auth.py", line 77, in login
query={"query": {"user": {"default_membership": "tenant_id"}}}
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 218, in graphql
token=token,
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 178, in post
token=token,
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 329, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 251, in _send_request
response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 578, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 643, in send
r = adapter.send(request, **kwargs)
File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
Hannah Amundson
07/10/2020, 12:40 PMAdam Roderick
07/10/2020, 12:49 PMDnyaneshwar
07/10/2020, 1:38 PMDaskExecutor
with YarnCluster
.
This gives me KilledWorker
error. I am not able to log more data as debug=True
option also doesn't add more information.
However, when I try the same tasks on DaskExecutor
with address=None
, I do not get any error.
What am I missing?bruno.corucho
07/10/2020, 1:51 PMCOPY configuration /opt/strdata-prefect/configuration
ENV PREFECT__USER_CONFIG_PATH /opt/strdata-prefect/configuration/config.toml
Config.toml:
backend = "cloud"
[cloud]
use_local_secrets = false
[cloud.agent]
name = "strdata-agent"
# Setting it to `DEBUG` for verbose logging
level = "DEBUG"
[logging]
# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
level = "DEBUG"
# Send logs to Prefect Cloud
log_to_cloud = true
# Extra loggers for Prefect log configuration
extra_loggers = "[]"
Error Logs:
ValueError: Local Secret "REDSHIFT_PASSWORD" was not found.
Thanks again, Prefect Team!Marwan Sarieddine
07/10/2020, 3:36 PMPedro Machado
07/10/2020, 5:48 PMfiles
dictionary
2. use a custom docker image with everything needed to run the flow
3. packaging the additional python files and pip installing them on the machine(s) that will execute the flow
Am I missing other approaches?
I don't think it's possible to package the flow + extra files when using the other types of storage. Correct? Are there plans to support this?Adam Roderick
07/10/2020, 9:33 PMModuleNotFoundError: No module named 'prefect.core.parameter'
does that look familiar to anyone?Jackson Maxfield Brown
07/10/2020, 9:36 PMfrom distributed import LocalCluster
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import LocalResult
@task(result=LocalResult(dir="single-results/"), target="hello.bytes")
def single_task():
return list(range(100))
def pipeline(debug: bool = False):
with Flow("example-local-results") as flow:
items = single_task()
cluster = LocalCluster()
state = flow.run(executor=DaskExecutor(cluster.scheduler_address))
I have also tried adding the checkpoint=True
but same deal. Nothing shows up.Mikael
07/11/2020, 6:20 AMfetch='all'
. If I want to read 5000 rows, store them somewhere and then read another 5000 rows. How would i proceed with prefect?Espen Overbye
07/11/2020, 10:51 AM