alex
05/10/2022, 7:00 PMKevin Kho
alex
05/10/2022, 7:52 PMKevin Kho
alex
05/10/2022, 7:57 PMKevin Kho
alex
05/11/2022, 2:13 PMKevin Kho
Anna Geller
prefect diagnostics
2. The flow code of the flow that fails
3. What agent type do you use and how did you start it?
4. Did you set the schedule via code or UI?alex
05/11/2022, 4:59 PMKevin Kho
alex
05/11/2022, 7:24 PMAnna Geller
alex
05/12/2022, 2:39 PM{
"config_overrides": {
"cloud": {
"agent": {
"auth_token": true
}
},
"context": {
"secrets": false
}
},
"env_vars": [
"PREFECT__LOGGING__EXTRA_LOGGERS"
],
"system_information": {
"platform": "Linux-5.4.0-1030-aws-x86_64-with-Ubuntu-20.04-focal",
"prefect_backend": "cloud",
"prefect_version": "0.15.4",
"python_version": "3.6.12"
}
}
2. Some examples below
3. local agent. prefect agent local start -l invenloado
running in a tmux session (I know this isn't recommended)
4. Via code
def to_prefect_schedule(self):
start_date = pendulum.today(tz=self.timezone)
return CronSchedule(self.cron, start_date=start_date)
def time_to_schedule(time_str):
"""
converts time in HH:MM format to a daily prefect schedule
:param time_str: time in HH:MM, where HH is in 24h format
"""
hour, min = time_str.split(":")
time = f"{min} {hour} * * *"
sch = MetaCronSchedule(cron_str=time, timezone="Canada/Eastern")
return sch.to_prefect_schedule()
def get_flow(
general_config: Conf,
coll_name: str,
shard_base: int,
shard_id: int,
):
client_name = general_config.prefect["project_name"]
es_host = general_config.host
index_name = general_config.index_name
with prefect.Flow(f"invenloado-{client_name}") as f:
store_id = prefect.Parameter("store_id", default=None)
shard_base = prefect.Parameter("shard_base", default=shard_base)
shard_id = prefect.Parameter("shard_id", default=shard_id)
InjectInventory(
client_name=client_name,
es_host=es_host,
es_index=index_name,
coll_name=coll_name,
shard_base=shard_base,
shard_id=shard_id,
store_id=store_id,
task_args=dict(tags=["inv-augmentation"]),
)
return f
def deploy(ctx, shard_base, shard_id, project, time):
if shard_id >= shard_base:
raise click.BadOptionUsage("shard-id", "shard-id must be < shard-base")
prefect_schedule = time_to_schedule(time)
general_config: Conf = ctx.obj["general_config"]
flow = get_flow(
general_config,
coll_name=coll_name,
shard_base=shard_base,
shard_id=shard_id,
)
flow.schedule = prefect_schedule
flow.register(project_name=project, labels=["invenloado"])
I have a bash script that iterates over ~10 projects and deploys a flow for each. The flow structure does not change with the client, just some configs. I have a concurrency limit with the invenloado
label of 1, so only one runs at a time. I've been using this workflow for a month and I haven't had any issues and I didn't re-register any flows around the time I saw this issue (this Tuesday). Only 2/10 flows I had got queued that day. All flows queued and ran as expected yesterday and today, with no changed haven been made on our end.Anna Geller
prefect register
CLI instead of using this custom class, this would automatically register all flows in a given directory without you having to do any looping etc
similarly I see no storage definition and if you don't define it explicitly, using flow.register() will pickle your flow object which can be hard to unpickle and cause issues when registering from a different place than the actual flow - again moving to register CLI would make things easier as this would ensure you are using script storage rather than pickle storage https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage
@alex let us know if this is resolved alreadyalex
05/18/2022, 2:37 PMdef get_flow(
general_config: Conf,
coll_name: str,
shard_base: int,
shard_id: int,
):
function that I call multiple times with different general_configs.
I've run into the same issue today and a few times in the previous week. Today, only 3/10 flows in the project actually ran. They're scheduled to run very close together, but we've set the concurrency limit to 1 so some of them enter a late state before they run. Today only the first 3 flows in order of schedule were queued and ran successfully. There's no trace of the other 7.
I'm concerned that they are just not being queued after being schedule. We have 1,000+ flows running everyday (with different structures and in different environments), so could it be a load issue?Kevin Kho
alex
05/18/2022, 2:41 PMKevin Kho
alex
05/18/2022, 2:44 PMKevin Kho
alex
05/18/2022, 3:18 PMKevin Kho
alex
05/18/2022, 3:23 PMKevin Kho
alex
05/18/2022, 3:26 PMKevin Kho
alex
05/18/2022, 3:29 PMKevin Kho
alex
05/18/2022, 3:36 PMKevin Kho
alex
05/18/2022, 3:47 PMKevin Kho
alex
05/18/2022, 3:52 PMKevin Kho
alex
05/18/2022, 4:00 PMKevin Kho
alex
05/18/2022, 4:07 PMquery flowruninfo {
flow_run(where: {name: {_eq: "wise-flounder"}}) {
name
flow {
name
}
logs {
message
timestamp
}
}
}
{
"errors": [
{
"message": "Unknown argument \"limit\" on field \"flow_concurrency\" of type \"Query\".",
"extensions": {
"code": "GRAPHQL_VALIDATION_FAILED"
}
}
]
}
Kevin Kho
alex
05/18/2022, 4:13 PMKevin Kho
alex
05/18/2022, 4:14 PMquery Test {
flow_concurrency {
label
}
}
Kevin Kho
Anna Geller
alex
05/18/2022, 4:20 PMAnna Geller
Kevin Kho
Anna Geller
alex
05/18/2022, 4:57 PMAnna Geller
from prefect import task, Flow
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("hello") as flow:
hw = hello_world()
generally speaking with situations such as this, it's always good to approach it step by step - start with a simple flow, then add storage, then add run config, then add a schedule, then add concurrency limits, then test with multiple runs, then extend the workflow to reflect your custom code, etc.
this makes it easier to identify the root causealex
05/26/2022, 2:29 PM