Qwame
08/09/2020, 9:06 AMF1 >> [f2, f3, f4, f5, f6]
What's the best way to set these dependencies in Prefect. I notice that set_downstream doesn't accept a list of tasks. Is there any efficient way to do this in Prefect?
Also does the new Prefect UI mean I don't need docker to run it?
ThanksVikram Iyer
08/10/2020, 6:05 AMVikram Iyer
08/10/2020, 6:05 AMemmanuel
08/10/2020, 7:20 AMLewis Bails
08/10/2020, 8:26 AMflow.environment = DaskKubernetesEnvironment(min_workers=1s, max_workers=5)
flow.storage = Docker(dockerfile='./dockerfile', env_vars={'known_hosts': #known_hosts, 'ssh_prv_key': #prv_key})
flow.register(project_name='Automated training')
[2020-08-10 08:07:11] INFO - prefect.Docker | Building the flow's Docker storage...
Traceback (most recent call last):
File "/home/leb/anaconda3/envs/ai-pipelines/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status
response.raise_for_status()
File "/home/leb/.local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http+docker://localhost/v1.40/build?t=auto-dask-kube-cloud%3A2020-08-10t08-07-11-222981-00-00&q=False&nocache=False&rm=False&forcerm=True&pull=False&dockerfile=.%2Ftmpez2venih%2FDockerfile>
Can anyone help me out?Roy
08/10/2020, 11:00 AMmerge
two task results together in a single one, the merge is supposed to return the first "real" result. What is considered a "real" result? It seems that only None
is considered a non-real result. I would have expected that NoResult
results would also be skipped, but this is not the case. Is this intended? Example in thread.Amit
08/10/2020, 11:24 AMlogger = logging.getLogger(__name__)
I have this in code everywhere and I don't want replace it with from prefect.utilities.logging import get_logger
everywhere, is there a way to send the default logger messages to cloud?Jim Klassen
08/10/2020, 2:23 PMJim Klassen
08/10/2020, 2:25 PMMarwan Sarieddine
08/10/2020, 3:44 PM$ kubectl logs pod/prefect-job-d45c5eda-cxn9x -n prefect
Usage: prefect execute [OPTIONS] COMMAND [ARGS]...
Try 'prefect execute -h' for help.
Error: No such command 'flow-run'.
bral
08/10/2020, 3:47 PMPhilip MacMenamin
08/10/2020, 4:23 PMlist
as a Parameter
? (As opposed to a str
- or do you just pass a string and split?)Marwan Sarieddine
08/10/2020, 5:27 PMMarwan Sarieddine
08/10/2020, 5:49 PMPhilip MacMenamin
08/10/2020, 6:36 PMwith Flow("STL Flow") as f:
files = Parameter("files")
f.run(files='fname.txt')
Riley Hun
08/10/2020, 8:57 PMRiley Hun
08/11/2020, 4:49 AMLewis Bails
08/11/2020, 6:05 AMemre
08/11/2020, 7:52 AM0.13.1
, I can’t get the slack_notifier
to work. This used to work in 0.12.1
. Here is a minimal example of a flow:
from prefect import Flow, task, context
from prefect.utilities.notifications import slack_notifier
@task
def log_run_id():
context.get("logger").warning(context.get("flow_run_id"))
@task(state_handlers=[slack_notifier])
def get_1():
return 1
with Flow("slack_example") as flow:
log = log_run_id()
result = get_1(upstream_tasks=[log])
flow.run()
Apparently slack_notifier
attempts to communicate with a backend server, but I am not going to use one. More info in thread.bral
08/11/2020, 9:35 AMemmanuel
08/11/2020, 11:31 AMemmanuel
08/11/2020, 2:29 PMPREFECT_SERVER__GRAPHQL_URL
the right env variable for the UI docker image? it seems to be the one used in the docker compose file but when ran outside of docker (so not on localhost), it doesn’t seem to be working 😕Richard Hughes
08/11/2020, 2:55 PMChris Martin
08/11/2020, 2:59 PMtask.map
, but I can't get it to work and wonder if this is possible
Here's some example code:
from prefect import task, Flow, Task
@task
def task1():
pass
@task
def task2():
return ["a", "b", "c"]
@task
def task3(x):
pass
with Flow("test") as flow:
t1 = task1()
params = task2()
task3.map(params, upstream_tasks=[t1])
I can register this flow fine and the dag looks coorrect (see attachment), but when I run, task3
fails with "No upstream states can be mapped over".Am I doing something wrong here?Akshay Verma
08/11/2020, 3:12 PMemmanuel
08/11/2020, 3:53 PMapollo
OSS as well?emmanuel
08/11/2020, 4:05 PMHannah Amundson
08/11/2020, 4:54 PMprefect.context.get("logger")
How do we get that logger inside of each of our classes/functions that are being called within tasks? Is the best practice to just pass it in as a parameter?Riley Hun
08/11/2020, 6:48 PMMarwan Sarieddine
08/11/2020, 9:36 PMUnexpected error: TimeoutError()
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 410, in get_flow_run_state
with executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 239, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1046, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1108, in _ensure_connected
await asyncio.wait_for(self._update_scheduler_info(), timeout)
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
it seems to me the solution to avoid this timeout is to add a task that waits for the workers to be ready - anyone else run into this ?
(I am using a DaskKubernetesEnvironment for execution on AWS EKS)