Richard
04/06/2020, 8:52 PMBrad
04/06/2020, 9:12 PMFlows with required parameters can not be scheduled automatically
errors when trying to create one with parameter_defaults
Brad
04/06/2020, 9:13 PMZiyao Wei
04/06/2020, 9:56 PMflow.register()
? So that if multiple identical tasks (either autodetect through hashing the flow object (?) or using a client provided ID) are submitted through different agents all of the agents can run the same flowBen Fogelson
04/06/2020, 10:36 PMJackson Maxfield Brown
04/06/2020, 11:55 PMDaskExecutor
which in turn connects to a SLURMCluster
from dask_jobqueue
. There are about 14000 items to process in a mapped task and each result in that map returns None
so no worries on memory management there. However, the workflow runs until about ~2200 tasks are completed then errors and restarts the whole workflow. The errors that I see on the main workflow / scheduler thread all follow something like:
distributed.core - ERROR - 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Traceback (most recent call last):
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 411, in handle_comm
result = await result
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 1758, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2714, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 472, in handle_stream
handler(**merge(extra, msg))
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2675, in handle_long_running
ts = self.tasks[key]
KeyError: 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Any ideas what could possibly be causing this issue? (This seems like a dask
question and not too much prefect
but thought someone here may know)Argenis Leon
04/07/2020, 3:58 AM(rapids_013) argenisleon@rapids:~$ prefect server start
Pulling postgres ... done
Pulling hasura ... done
Pulling graphql ... done
Pulling scheduler ... done
Pulling apollo ... done
Pulling ui ... done
Creating network "prefect-server" with the default driver
Creating cli_postgres_1 ... done
Creating cli_hasura_1 ... done
Creating cli_graphql_1 ... done
Creating cli_scheduler_1 ... done
Creating cli_apollo_1 ... done
Creating cli_ui_1 ...
Creating cli_ui_1 ... error
ERROR: for cli_ui_1 Cannot start service ui: driver failed programming external connectivity on endpoint cli_ui_1 (ab05e1d530a187151aa2ca7d03d0cdc964c4ed96f6ddb41ca0173850d2719ae8): Error starting userland proxy: listen tcp 0.0.0.0:8080: bind: address already in use
ERROR: for ui Cannot start service ui: driver failed programming external connectivity on endpoint cli_ui_1 (ab05e1d530a187151aa2ca7d03d0cdc964c4ed96f6ddb41ca0173850d2719ae8): Error starting userland proxy: listen tcp 0.0.0.0:8080: bind: address already in use
ERROR: Encountered errors while bringing up the project.
Any help?Ogaday
04/07/2020, 10:43 AMBrett Naul
04/07/2020, 9:09 PMtoday
from https://docs.prefect.io/api/latest/utilities/context.html as parameter defaults? right now I have a separate task that wraps each date paramManuel Aristarán
04/07/2020, 9:09 PMprefect.client.Client
? my use case is triggering a Flow from an app.Brad
04/07/2020, 11:13 PM[2020-04-07 22:52:35,213] ERROR - agent | Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'")
Brad
04/07/2020, 11:18 PMparameter_defaults
?Ben Fogelson
04/08/2020, 4:35 PMArsenii
04/09/2020, 2:54 AMJayakar
04/09/2020, 8:37 AMToby Coleman
04/09/2020, 11:01 AMDavid Ojeda
04/09/2020, 1:50 PMshazard
04/09/2020, 2:28 PMChris Hart
04/09/2020, 5:26 PMViv Ian
04/09/2020, 11:18 PMdjango.setup()
)?
• Is there a way to manually trigger a Flow? For example, I design a flow to do some calculations, and I only want it to run if a user clicks a button. Is there an API endpoint I can use to fire it off?
THANKS!shazard
04/10/2020, 7:26 AM@task
def dummy_task(param):
pass
with Flow("test") as flow:
dummy_task.map(list(range(100)))
flow.run()
flow.register()
This code does run correctly locally, but gets stuck running the 64th task when manually ran from the Core Server UI.
Since it always gets stuck on the same task I suppose this has to do with some resource or process limit but I can't seem to point exactly what it can be.
Any idea ?Jie Lou
04/10/2020, 3:20 PM@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
print("starting sleep {} seconds".format(x))
time.sleep(x)
return x
with Flow("test", result_handler=s3_result_handler) as flow:
time = Parameter("time", default=[60])
results = cal.map(time)
When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
starting sleep 60 seconds . #then I killed the worker...
Task 'cal[0]': Starting task run...
Task 'cal[0]': task is already running.
Task 'cal[0]': finished task run for task with final state: 'Running'
1 mapped tasks submitted for execution.
Task 'cal': Handling state change from Mapped to Mapped
Task 'cal': task has been mapped; ending run.
Task 'cal': finished task run for task with final state: 'Mapped'
Flow run RUNNING: terminal tasks are incomplete.
Marked "Failed" by a Zombie Killer process.
It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie.
However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
print("starting sleep {} seconds".format(x[0]))
time.sleep(x[0])
return x[0]
with Flow("test", result_handler=s3_result_handler) as flow:
time = Parameter("time", default=[60])
results = cal(time)
Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
starting sleep 60 seconds #then I killed the worker...
Task 'time': Starting task run...
Task 'time': Handling state change from Pending to Running
Task 'time': Calling task.run() method...
Task 'time': Handling state change from Running to Success
Task 'time': finished task run for task with final state: 'Success'
Task 'cal': Starting task run...
Task 'cal': Handling state change from Pending to Running
Task 'cal': Calling task.run() method...
starting sleep 60 seconds
and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!Jacob (he/him)
04/10/2020, 4:28 PMJohn Ramirez
04/10/2020, 8:30 PMMitchell Bregman
04/10/2020, 10:28 PMflow.register
but nothing seems to be showing upLeo Meyerovich (Graphistry)
04/11/2020, 12:03 AMagent
/ executor
docker service?
I saw a feb 5 PR around some storage healthchecks, but not seeing docs for instrumented monitoring here. Ideally something curl-able, like,
healthcheck:
test: ["CMD-SHELL", "curl -sSf <http://prefect/health> | jq .code | grep 200 || exit 1"]
Leo Meyerovich (Graphistry)
04/11/2020, 12:50 AMZiyao Wei
04/11/2020, 3:17 AMZiyao Wei
04/11/2020, 3:18 AMPREFECT_API_URL
but doesn’t seem to work (seems obvious in retrospect)Leo Meyerovich (Graphistry)
04/11/2020, 4:56 AMseq([ task_1(file_1), task_2(file_2), ... task_n(file_n)])
. as soon as the meta-task is submitted, the notebook (and its local agent) can stop. however, for the next 3 days, we want those tasks to run one at a time, and we see status in the ui (incl. fails/retries). if we ever want to, we can rerun the flow to add/swap tasks.