Elie Hamouche
08/16/2020, 10:38 AMfrom datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.schedules import IntervalSchedule
prefect.config.debug = True
prefect.config.logging.level = "DEBUG"
@task(name="orchestrate", log_stdout=True)
def orchestrate():
logger = prefect.context["logger"]
logger.info("Orchestrating")
for i in range(1, 60):
logger.error(f"Counting {i}")
return True
flow = Flow("Cloud Flow", schedule=IntervalSchedule(
interval=timedelta(minutes=1),
))
with flow:
orchestrate()
flow.register(project_name="dev")
flow.run_agent(token="XXXX")
And I start the agent with:
python example.py
Agent seems to come up fine but just hangs:
(venv) ibraflow (master|✚5…) python example.py
Result check: OK
Flow: <https://cloud.prefect.io/ysft/flow/b7f56496-54df-4eeb-a0b6-2e274c39ed80>
____ __ _ _ _
| _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __|
| __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_
|_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__|
|___/
[2020-08-16 10:34:38,874] INFO - agent | Starting LocalAgent with labels ['MacBook-Pro-2.local', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage', 'github-flow-storage', 'webhook-flow-storage']
[2020-08-16 10:34:38,874] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-08-16 10:34:38,875] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
[2020-08-16 10:34:39,009] INFO - agent | Waiting for flow runs...
On the cloud's UI I see the correct version and task, so that rules out authentication and network issues?
But the tasks never run and are always marked "late" - attachment:nicholas
08/16/2020, 11:56 AMElie Hamouche
08/16/2020, 1:32 PMJeremiah
08/16/2020, 6:54 PMSUBMITTED
(or RUNNING
) runs. This is to solve a situation we encountered in the early days of Cloud where an Agent would run into trouble (for example being unable to launch K8s jobs) and would end up spamming the cluster with unrunnable jobs. Therefore, if agents have high submitted counts, we don’t deliver work until they free up. In your case, Cloud was consistently behaving as if you had multiple SUBMITTED
runs, when of course you don’t. This value is cached, but only for a short while, after which time it should refresh from the database.Therefore I’m still unable to explain why it persisted for so long, but I’ve manually cleared it and if you see the problem again, please DM me so I can try to identify it directly.Elie Hamouche
08/16/2020, 8:53 PMJeremiah
08/16/2020, 9:00 PMElie Hamouche
08/16/2020, 9:01 PMprefect.config.debug = True
prefect.config.logging.level = "DEBUG"
@task(name="orchestrate", log_stdout=True)
def orchestrate():
logger = prefect.context["logger"]
<http://logger.info|logger.info>("Orchestrating")
for i in range(1, 60):
logger.error(f"Counting {i}")
return True
Chris White
08/16/2020, 9:08 PM--show-flow-logs
or the kwarg show_flow_logs=True
in run_agent
to see the flow run logs in STDOUT.
Note this only works for the local agent because all other agents submit jobs that don’t can’t pipe STDOUT back and forthElie Hamouche
08/16/2020, 9:19 PMJeremiah
08/16/2020, 9:26 PMElie Hamouche
08/16/2020, 10:29 PMChris White
08/19/2020, 8:52 PMElie Hamouche
08/20/2020, 6:41 PMJeremiah
08/20/2020, 6:52 PMElie Hamouche
08/20/2020, 6:53 PMChris White
08/20/2020, 7:18 PMdate
from the terminal or import pendulum; pendulum.now("utc")
and compare that against the scheduled timestamps for your flow runs?
- is the delay in your flow runs consistent or random?Elie Hamouche
08/20/2020, 7:22 PM(data-server) data-server (master|✚29…) date
Thu 20 Aug 2020 20:20:58 BST
(data-server) data-server (master|✚29…) python
>>> import pendulum; pendulum.now("utc")
DateTime(2020, 8, 20, 19, 21, 25, 382367, tzinfo=Timezone('UTC'))
Chris White
08/20/2020, 7:23 PMElie Hamouche
08/20/2020, 7:26 PMChris White
08/20/2020, 7:47 PMElie Hamouche
08/20/2020, 8:02 PM