Peter Peter
07/21/2021, 1:54 PM@task(log_stdout=True)
def print_msg():
print("Will it work?")
Sample task will print message to the log:
@task(log_stdout=True)
def print_msg():
print("Will it work? Yup!")
time.sleep(5)
Any help appreciated,
PKevin Kho
Peter Peter
07/21/2021, 2:12 PMKevin Kho
Peter Peter
07/21/2021, 2:26 PMKevin Kho
Peter Peter
07/21/2021, 2:36 PMKevin Kho
Peter Peter
07/22/2021, 3:49 PMKevin Kho
Peter Peter
08/16/2021, 10:21 AMKevin Kho
Zanie
Peter Peter
08/16/2021, 5:31 PMZanie
Peter Peter
08/16/2021, 5:32 PMZanie
Peter Peter
08/16/2021, 5:59 PMZanie
logger
instance instead of logging stdout, do you see the same behavior?Zanie
Peter Peter
08/16/2021, 6:24 PMZanie
Peter Peter
08/16/2021, 7:03 PMZanie
Peter Peter
08/16/2021, 7:04 PMZanie
Zanie
KubeCluster
dask cluster?Peter Peter
08/16/2021, 7:14 PMZanie
Zanie
Peter Peter
08/16/2021, 7:21 PMZanie
Zanie
from prefect import Flow, task
from prefect.executors import DaskExecutor
@task(log_stdout=True)
def print_msg():
print("Will it work?")
with Flow("test", executor=DaskExecutor()) as flow:
print_msg()
do you see the same behavior?Zanie
prefect register -p flow.py --project test
and prefect run --name test --execute
but I also cannot reproduce this when using an agent.Peter Peter
08/16/2021, 7:42 PMPeter Peter
08/16/2021, 7:43 PMZanie
Peter Peter
08/16/2021, 7:51 PMZanie
Zanie
Peter Peter
08/17/2021, 10:18 AMPeter Peter
08/17/2021, 10:18 AMfrom prefect import task
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from prefect import Flow
import prefect
@task(log_stdout=True)
def print_first():
print("_First Message!")
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("1234!")
logger.error("5678!")
with Flow("log_flow_delay_after_3rd") as flow:
a1 = print_first()
env_vars = {
"PREFECT__LOGGING__LEVEL": "DEBUG",
}
storage = Docker(registry_url="FILLIN",
image_name="IMAGENAME",
image_tag="0.1.36",
local_image=False,
env_vars=env_vars)
executor = DaskExecutor(adapt_kwargs={"minimum": 1, "maximum": 1})
flow.executor = executor
flow.run_config = KubernetesRun(
labels=["DASK"],
cpu_request=1,
memory_request=1,
)
storage.add_flow(flow)
flow.storage = storage
flow.register(project_name="log_test", build=False)
storage.build(push=True)
Zanie
Zanie
Peter Peter
08/18/2021, 10:47 AMZanie
KubernetesRun
?Peter Peter
08/18/2021, 4:16 PMZanie
Peter Peter
08/30/2021, 6:24 PMZanie
Zanie
Zanie
Tyler Wanner
08/31/2021, 5:26 PMbase_image = "prefecthq/prefect:0.14.6-python3.8"
but I don't believe this to be the problem. Anything interesting about the cluster it's running in?Peter Peter
09/03/2021, 1:22 PM