https://prefect.io logo
s

Seth Just

05/05/2022, 10:09 PM
We are running into an issue with an ECS agent where the logs from individual flow tasks aren't appearing in Prefect Cloud. Any advice for how to troubleshoot this issue? We're at a total loss.
k

Kevin Kho

05/05/2022, 10:19 PM
How long has it been? You said individual flow tasks meaning some tasks in a Prefect Flow? Or a ECS Task running a fLow?
s

Seth Just

05/05/2022, 10:20 PM
We've been observing this consistently for a couple weeks. I'm referring to all the logs from the ECS task running a flow. The only log statements appearing in Prefect Cloud are from code executed outside any Prefect Task.
k

Kevin Kho

05/05/2022, 10:21 PM
Can you DM me a sample flow run id?
s

Seth Just

05/05/2022, 10:21 PM
Yes, give me just one moment.
k

Kevin Kho

05/05/2022, 10:40 PM
A few questions: 1. Do you have CloudWatch logs? 2. Does the same flow have other runs with successful logging? 3. Are you by any chance modifying the logger mid run?
s

Seth Just

05/05/2022, 10:42 PM
1. Yes, the ECS flow task's logs all appear in CloudWatch 2. No, all runs seem to have the same problem 3. I doubt it, but we can check. Update: we only call `warning`/`info`/`debug`, but we do use a mix of
prefect.context.get("logger')
and
prefect.context.logger
k

Kevin Kho

05/05/2022, 10:47 PM
That’s really weird, but I guess it’s flow specific too. Could you send me the code?
a

Anna Geller

05/06/2022, 12:15 AM
that's a clever trick to avoid redeploying flow when your task definition changes!
Copy code
try:
    _task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
    _task_defn_arn = None
the issue you see is that you define the logger in your Flow block. In Prefect 1.0, you can only reliably log from tasks. If you move this logging part into the task, your log messages will appear in Cloud UI:
Copy code
with case(use_docker, False):
        logger: Logger = prefect.context.get("logger")

        # Note: because this use of context/config is not in a task it will be eagerly evaluated
        # whenever this script is _imported_, rather than when the flow is run!
        try:
            job_defn = prefect.context.config.seer.encyclopedia.batch_job_definition
        except (AttributeError, KeyError):
            job_defn = DEFAULT_BATCH_JOB_DEFINITION
        <http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
I mention Prefect 1.0 explicitly because in Prefect 2.0 you can log from tasks, flows and subflows
s

Seth Just

05/06/2022, 3:31 PM
Okay, great, thank you @Anna Geller @Kevin Kho! I will look at moving all of our logging into tasks and see what the outcome is.
👍 1
I've moved all the logger usages into tasks and I'm still not seeing logs in Prefect Cloud.
Any advice for debugging the issue @Anna Geller @Kevin Kho? I'm not familiar with how exactly they flow from the ECS flow task to the agent and/or Prefect server.
k

Kevin Kho

05/06/2022, 8:56 PM
Could you show the new code?
s

Seth Just

05/06/2022, 8:58 PM
Just changed up that one snippet; happy to share the full script if you need
Copy code
with case(use_docker, False):
        # Note: because this use of context/config is not in a task it will be eagerly evaluated
        # whenever this script is _imported_, rather than when the flow is run!
        try:
            job_defn = prefect.context.config.seer.encyclopedia.batch_job_definition
        except (AttributeError, KeyError):
            job_defn = DEFAULT_BATCH_JOB_DEFINITION

        try:
            quant_job_defn = prefect.context.config.seer.encyclopedia.quant_batch_job_definition
        except (AttributeError, KeyError):
            quant_job_defn = job_defn

        try:
            job_queue = prefect.context.config.seer.encyclopedia.batch_job_queue
        except (AttributeError, KeyError):
            job_queue = DEFAULT_BATCH_JOB_QUEUE

        try:
            quant_job_queue = prefect.context.config.seer.encyclopedia.quant_batch_job_queue
        except (AttributeError, KeyError):
            quant_job_queue = job_queue

        batch_job_tags = get_batch_job_tags(parameters_dict)

        @task
        def log_batch_settings(  # pylint: disable=redefined-outer-name
            job_defn, quant_job_defn, job_queue, quant_job_queue, batch_job_tags
        ):
            """
            Just logs the settings used for invoking Batch Jobs; this logging must
            occur inside a task. DE-56.
            """
            logger: Logger = prefect.context.get("logger")

            <http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
            <http://logger.info|logger.info>("Using Batch job definition %s for quant step", quant_job_defn)
            <http://logger.info|logger.info>("Using Batch job queue %s for interim searches", job_queue)
            <http://logger.info|logger.info>("Using Batch job queue %s for quant step", quant_job_queue)

            <http://logger.info|logger.info>("Tags for Batch jobs: %s", json.dumps(batch_job_tags))

        log_batch_settings(job_defn, quant_job_defn, job_queue, quant_job_queue, batch_job_tags)
k

Kevin Kho

05/06/2022, 8:59 PM
There are a bunch of things that will not work here I think. Is
case
the Prefect
case
?
s

Seth Just

05/06/2022, 9:00 PM
The flow runs perfectly in production, it's just that we don't see the logs in Prefect Cloud. It is
prefect.case
k

Kevin Kho

05/06/2022, 9:06 PM
These lines made it right?
Copy code
<http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
            <http://logger.info|logger.info>("Using Batch job definition %s for quant step", quant_job_defn)
            <http://logger.info|logger.info>("Using Batch job queue %s for interim searches", job_queue)
            <http://logger.info|logger.info>("Using Batch job queue %s for quant step", quant_job_queue)
and then the ones below did not?
s

Seth Just

05/06/2022, 9:06 PM
this is all I see
And this is well after all the tasks have run successfully
I do see all the expected logs in CloudWatch (from the flow's ECS task)
k

Kevin Kho

05/06/2022, 9:12 PM
Ah ok I get what the code is doing now with the try excepts and the fork, but I really can’t pinpoint what would cause this. Sorry, but I likely need a more minimal example here 😅
s

Seth Just

05/06/2022, 9:15 PM
That's understandable, I can try to put together an SSCCE for you, but could you give me any pointers on debugging what might be going wrong? I don't know where to look in terms of whether there's some missing/bad piece of configuration somewhere that's leading to the logs disappearing. I'm pretty dubious that it's this specific flow because we're seeing this happen for multiple flows in multiple accounts.
k

Kevin Kho

05/06/2022, 9:21 PM
I don’t have a good guess because it all seems valid, but if I were to try reproducing, I would just try a simple flow with a logger.
Copy code
@task
def abc():
    <http://prefect.context.logger.info|prefect.context.logger.info>("test")

with Flow(..) as flow:
    abc()
and then this would tell us if maybe something in your AWS settings is not allowing the logs to send. Then let me just list the things that caught my eye that I don’t see often, but really seem like they would work: 1. try-except in flows 2. type hint on the logger 3. defining tasks inside the flow 4. The case statement
Copy code
with case(task_name,False):
    ...
might need instantiation
Copy code
a = task_name()
with case(a, False):
    ...
👍 1
5. the pylint comments
s

Seth Just

05/06/2022, 9:33 PM
Thanks for the advice. I can see how 1,3, and 4 aren't necessarily wrong but might be better avoided (they do work though). Not too worried about type hints or comments 🤷
k

Kevin Kho

05/06/2022, 9:34 PM
Same here man. I’m 99% sure they won’t affect it…but I can’t spot the problem either
s

Seth Just

05/06/2022, 10:07 PM
Ok, I built an SSCCE and registered/ran it in our infrastructure. This flow produces all expected log output:
Copy code
from logging import Logger

import prefect
from prefect import case, task, Task, Flow
from prefect.engine.results import PrefectResult
from prefect.storage import S3


try:
    _task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
    _task_defn_arn = None

RUN_CONFIG = (
    prefect.run_configs.ECSRun(task_definition_arn=_task_defn_arn)
    if _task_defn_arn
    else prefect.run_configs.UniversalRun()
)


@task(name="condition")
def condition():
    return True


@task(name="logging_task")
def logging_task(val: Task):
    logger: Logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Invoked with %s", str(val))


with Flow(
        name="test_logs",
        result=PrefectResult(),
        storage=S3("seer-prefect-flows-dev", stored_as_script=True),
        run_config=RUN_CONFIG
) as flow:
    with case(condition, True):
        logging_task("True")

    with case(condition, False):
        logging_task("False")

    logging_task("<unconditional>")
but this one does not:
Copy code
from logging import Logger

import prefect
from prefect import case, task, Task, Flow
from prefect.engine.results import PrefectResult
from prefect.executors import LocalDaskExecutor
from prefect.storage import S3


try:
    _task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
    _task_defn_arn = None

RUN_CONFIG = (
    prefect.run_configs.ECSRun(task_definition_arn=_task_defn_arn)
    if _task_defn_arn
    else prefect.run_configs.UniversalRun()
)


@task(name="condition")
def condition():
    return True


@task(name="logging_task")
def logging_task(val: Task):
    logger: Logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Invoked with %s", str(val))


with Flow(
        name="test_logs",
        result=PrefectResult(),
        executor=LocalDaskExecutor(scheduler="processes"),
        storage=S3("seer-prefect-flows-dev", stored_as_script=True),
        run_config=RUN_CONFIG
) as flow:
    with case(condition, True):
        logging_task("True")

    with case(condition, False):
        logging_task("False")

    logging_task("<unconditional>")
The difference is just one line:
Copy code
executor=LocalDaskExecutor(scheduler="processes"),
k

Kevin Kho

05/06/2022, 10:10 PM
That’s really weird. For DaskExecutor it’s normal to have missing logs (because of Dask not supporting it), but for LocalDask it’s not normal. Thanks for the example! I will probably get to trying to reproduce on my end a bit later tonight
s

Seth Just

05/06/2022, 10:11 PM
👍 Sounds good, thanks! I'm only seeing the problem if I use
scheduler="processes"
;
LocalDaskExecutor()
seems to work normally
k

Kevin Kho

05/06/2022, 10:11 PM
Because the default is “threads” but processes should be visible
s

Seth Just

05/06/2022, 10:11 PM
💯
@Kevin Kho any follow-up on this issue?
k

Kevin Kho

05/09/2022, 6:50 PM
Oh shit my bad. Thanks for the ping. Will work on it in a bit
👍 1
I have absolutely replicated this with a simple flow
Copy code
@task
def abc(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"hello {x}")
    return "hello"

with Flow("ecs_test") as flow:
    abc.map(list(range(10)))
I just need to find out if it’s known and if not, I’ll make an issue
👍 1
Replicated on DockerRun too
Issue to track
👍 2
38 Views