Seth Just
05/05/2022, 10:09 PMKevin Kho
Seth Just
05/05/2022, 10:20 PMKevin Kho
Seth Just
05/05/2022, 10:21 PMKevin Kho
Seth Just
05/05/2022, 10:42 PMprefect.context.get("logger')
and prefect.context.logger
Kevin Kho
Anna Geller
try:
_task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
_task_defn_arn = None
the issue you see is that you define the logger in your Flow block. In Prefect 1.0, you can only reliably log from tasks. If you move this logging part into the task, your log messages will appear in Cloud UI:
with case(use_docker, False):
logger: Logger = prefect.context.get("logger")
# Note: because this use of context/config is not in a task it will be eagerly evaluated
# whenever this script is _imported_, rather than when the flow is run!
try:
job_defn = prefect.context.config.seer.encyclopedia.batch_job_definition
except (AttributeError, KeyError):
job_defn = DEFAULT_BATCH_JOB_DEFINITION
<http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
Seth Just
05/06/2022, 3:31 PMKevin Kho
Seth Just
05/06/2022, 8:58 PMwith case(use_docker, False):
# Note: because this use of context/config is not in a task it will be eagerly evaluated
# whenever this script is _imported_, rather than when the flow is run!
try:
job_defn = prefect.context.config.seer.encyclopedia.batch_job_definition
except (AttributeError, KeyError):
job_defn = DEFAULT_BATCH_JOB_DEFINITION
try:
quant_job_defn = prefect.context.config.seer.encyclopedia.quant_batch_job_definition
except (AttributeError, KeyError):
quant_job_defn = job_defn
try:
job_queue = prefect.context.config.seer.encyclopedia.batch_job_queue
except (AttributeError, KeyError):
job_queue = DEFAULT_BATCH_JOB_QUEUE
try:
quant_job_queue = prefect.context.config.seer.encyclopedia.quant_batch_job_queue
except (AttributeError, KeyError):
quant_job_queue = job_queue
batch_job_tags = get_batch_job_tags(parameters_dict)
@task
def log_batch_settings( # pylint: disable=redefined-outer-name
job_defn, quant_job_defn, job_queue, quant_job_queue, batch_job_tags
):
"""
Just logs the settings used for invoking Batch Jobs; this logging must
occur inside a task. DE-56.
"""
logger: Logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
<http://logger.info|logger.info>("Using Batch job definition %s for quant step", quant_job_defn)
<http://logger.info|logger.info>("Using Batch job queue %s for interim searches", job_queue)
<http://logger.info|logger.info>("Using Batch job queue %s for quant step", quant_job_queue)
<http://logger.info|logger.info>("Tags for Batch jobs: %s", json.dumps(batch_job_tags))
log_batch_settings(job_defn, quant_job_defn, job_queue, quant_job_queue, batch_job_tags)
Kevin Kho
case
the Prefect case
?Seth Just
05/06/2022, 9:00 PMprefect.case
Kevin Kho
<http://logger.info|logger.info>("Using Batch job definition %s for interim searches", job_defn)
<http://logger.info|logger.info>("Using Batch job definition %s for quant step", quant_job_defn)
<http://logger.info|logger.info>("Using Batch job queue %s for interim searches", job_queue)
<http://logger.info|logger.info>("Using Batch job queue %s for quant step", quant_job_queue)
Seth Just
05/06/2022, 9:06 PMKevin Kho
Seth Just
05/06/2022, 9:15 PMKevin Kho
@task
def abc():
<http://prefect.context.logger.info|prefect.context.logger.info>("test")
with Flow(..) as flow:
abc()
and then this would tell us if maybe something in your AWS settings is not allowing the logs to send.
Then let me just list the things that caught my eye that I don’t see often, but really seem like they would work:
1. try-except in flows
2. type hint on the logger
3. defining tasks inside the flow
4. The case statement
with case(task_name,False):
...
might need instantiation
a = task_name()
with case(a, False):
...
Seth Just
05/06/2022, 9:33 PMKevin Kho
Seth Just
05/06/2022, 10:07 PMfrom logging import Logger
import prefect
from prefect import case, task, Task, Flow
from prefect.engine.results import PrefectResult
from prefect.storage import S3
try:
_task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
_task_defn_arn = None
RUN_CONFIG = (
prefect.run_configs.ECSRun(task_definition_arn=_task_defn_arn)
if _task_defn_arn
else prefect.run_configs.UniversalRun()
)
@task(name="condition")
def condition():
return True
@task(name="logging_task")
def logging_task(val: Task):
logger: Logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Invoked with %s", str(val))
with Flow(
name="test_logs",
result=PrefectResult(),
storage=S3("seer-prefect-flows-dev", stored_as_script=True),
run_config=RUN_CONFIG
) as flow:
with case(condition, True):
logging_task("True")
with case(condition, False):
logging_task("False")
logging_task("<unconditional>")
but this one does not:
from logging import Logger
import prefect
from prefect import case, task, Task, Flow
from prefect.engine.results import PrefectResult
from prefect.executors import LocalDaskExecutor
from prefect.storage import S3
try:
_task_defn_arn = prefect.backend.kv_store.get_key_value(key="prefect_flow_task_definition_arn")
except:
_task_defn_arn = None
RUN_CONFIG = (
prefect.run_configs.ECSRun(task_definition_arn=_task_defn_arn)
if _task_defn_arn
else prefect.run_configs.UniversalRun()
)
@task(name="condition")
def condition():
return True
@task(name="logging_task")
def logging_task(val: Task):
logger: Logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Invoked with %s", str(val))
with Flow(
name="test_logs",
result=PrefectResult(),
executor=LocalDaskExecutor(scheduler="processes"),
storage=S3("seer-prefect-flows-dev", stored_as_script=True),
run_config=RUN_CONFIG
) as flow:
with case(condition, True):
logging_task("True")
with case(condition, False):
logging_task("False")
logging_task("<unconditional>")
executor=LocalDaskExecutor(scheduler="processes"),
Kevin Kho
Seth Just
05/06/2022, 10:11 PMscheduler="processes"
; LocalDaskExecutor()
seems to work normallyKevin Kho
Seth Just
05/06/2022, 10:11 PMKevin Kho
@task
def abc(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"hello {x}")
return "hello"
with Flow("ecs_test") as flow:
abc.map(list(range(10)))
I just need to find out if it’s known and if not, I’ll make an issue