https://prefect.io logo
#prefect-community
Title
# prefect-community
o

Omar Sultan

04/19/2022, 8:13 AM
Hello Everyone, We currently have a setup of Prefect Core Server running on our on-prem environment using Kubernetes. We are running our flows using a LocalDaskExecutor and we noticed that the logs for these flows do not show up on the portal. Only logs for flows that do not use a LocalDaskExecutor show logs. Is there a special configuration required for Dask to send the logs?
a

Anna Geller

04/19/2022, 9:43 AM
There might be an issue with sending logs from all workers when you use DaskExecutor. But LocalDaskExecutor doesn't use Dask so this shouldn't be a problem - can you share your example flow that fails to send logs to your Server backend?
o

Omar Sultan

04/20/2022, 12:14 AM
Hi Anna, Thanks
Copy code
class RunMeFirst(Task):
    def run(self):
        print("I'm running first!")


class PlusOneTask(Task):
    def run(self, x):
        print(x + 1)
        return x + 1


flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
run_me = RunMeFirst()
plus_one.log_stdout = True
run_me.log_stdout = True
flow.set_dependencies(
    task=plus_one,
    upstream_tasks=[run_me],
    keyword_tasks=dict(x=10))

flow.storage = get_jigsaw_api_storage(1, f"Test Flow")
flow.storage.build()
flow.run_config = get_jigsaw_k8_config_run()
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=2)
flow.register("Utilities")
Note that when i remove the flow.executor line before the last line , the prints appear normally in the logs on the UI
a

Anna Geller

04/20/2022, 9:49 AM
Thanks for sharing your code. Could you try running a simple hello world flow using the functional API and task decorator? This way you can step by step test what is failing in your configuration
e.g. you may try:
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


@task
def generate_random_numbers():
    return list(range(1, 200))


@task
def add_one(x):
    return x + 1


@task(log_stdout=True)
def print_results(res):
    print(res)


with Flow("mapping", executor=LocalDaskExecutor(), storage=get_jigsaw_api_storage(1, f"Test Flow")) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
The issue you have is that you build the storage before you attach the executor. Since the executor information is retrieved from storage at runtime, your executor information won't be found. Switching the order may already help solve your issue:
Copy code
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=2)
flow.run_config = get_jigsaw_k8_config_run()
flow.storage = get_jigsaw_api_storage(1, f"Test Flow")
flow.storage.build()
flow.register("Utilities")
o

Omar Sultan

04/21/2022, 10:52 PM
Hi Anna, I tried to change the order as you suggested and I'm still getting the same 😞
will try to investigate more to understand what is happening
a

Anna Geller

04/21/2022, 11:00 PM
but do you understand the issue? the problem is that you are attaching the executor too late
o

Omar Sultan

04/24/2022, 11:08 PM
Yes I think I got the problem, I'm trying to see how I can change the flow a little bit so I can capture the logs correctly
👍 1
3 Views