Tomer Cagan
03/20/2022, 12:48 PMworker_client
and then submit new tasks to the same cluster (code inside), I see that printout from the first call of the function is logged in prefect and shipped to server, but consecutive calls are not (see log inside).
Is there some trick I can use?def fib(value: int) -> int:
slow_compute_delay = random.random() * 2
print(
f"calculating fib({value}). sleeping {slow_compute_delay} seconds to simulate slow processing"
)
time.sleep(slow_compute_delay)
if value < 2:
return value
with worker_client() as client:
a_future = client.submit(fib, value - 1)
b_future = client.submit(fib, value - 2)
a, b = client.gather([a_future, b_future])
return a + b
@task(log_stdout=True)
def get_data(value1: int, value2: str) -> List[int]:
print(f"the parameters are value1={value1}, value2={value2}")
return list(range(value1))
@task(log_stdout=True)
def load_stuff(values: List[int]) -> List[int]:
print(f"loading stuff - got a list of size {len(values)}")
return values
@task(log_stdout=True)
def compute_with_dask(values: List[int]) -> int:
max_value = max(values)
print(
f"got a list of size {len(values)} - starting long process to calculate fib({max_value})"
)
results = fib(max_value)
print(f"got results of calculation, the value is {results}")
with Flow("fib-dask-flow") as flow:
param = Parameter("value1", default=12)
param2 = Parameter("value2", default="Prefect")
values = get_data(param, param2)
loaded = load_stuff(values)
compute_with_dask(loaded)
flow.storage = Docker(
registry_url="<http://docker.k8s.nextsilicon.com/repository/container-ns/|docker.k8s.nextsilicon.com/repository/container-ns/>",
image_name="research-dask-fib",
)
flow.executor = DaskExecutor()
The logs (collected with kubectl, or actually, k9s) - note how calculating fib(11)...
is captured by prefect but consecutive calls are not...
│ [2022-03-20 10:10:39+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'fib-dask-flow' │
│ [2022-03-20 10:10:39+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`... |
| [2022-03-20 10:10:41+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://127.0.0.1:8787/status> |
| /usr/local/lib/python3.9/site-packages/distributed/scheduler.py:5670: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting. |
| warnings.warn( |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value1': Starting task run... |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value2': Starting task run... |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value1': Finished task run for task with final state: 'Success' |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value2': Finished task run for task with final state: 'Success' |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'get_data': Starting task run... |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | the parameters are value1=12, value2=Prefect |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'get_data': Finished task run for task with final state: 'Success' |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'load_stuff': Starting task run... |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | loading stuff - got a list of size 12 |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'load_stuff': Finished task run for task with final state: 'Success' |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'compute_with_dask': Starting task run... |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | got a list of size 12 - starting long process to calculate fib(11) |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | calculating fib(11). sleeping 0.5530870039737996 seconds to simulate slow processing |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudTaskRunner | got results of calculation, the value is 89 |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudTaskRunner | Task 'compute_with_dask': Finished task run for task with final state: 'Success' |
| calculating fib(1). sleeping 1.1147617720297223 seconds to simulate slow processing |
| calculating fib(0). sleeping 1.6729680086778969 seconds to simulate slow processing |
| calculating fib(6). sleeping 0.7344708098646031 seconds to simulate slow processing |
| calculating fib(5). sleeping 1.185138104797178 seconds to simulate slow processing |
| calculating fib(3). sleeping 1.1508838185913755 seconds to simulate slow processing |
| calculating fib(7). sleeping 1.3828636184701388 seconds to simulate slow processing |
| calculating fib(8). sleeping 0.8941380123847236 seconds to simulate slow processing |
| calculating fib(10). sleeping 1.6048135240274324 seconds to simulate slow processing |
| calculating fib(4). sleeping 0.15924167691710855 seconds to simulate slow processing |
| calculating fib(9). sleeping 0.5762516282451282 seconds to simulate slow processing |
| calculating fib(2). sleeping 0.5941398434837817 seconds to simulate slow processing |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded |
| Stream closed EOF for research/prefect-job-5d1873a9--1-z78p5 (flow)
Anna Geller
03/20/2022, 1:01 PMTomer Cagan
03/20/2022, 1:32 PMflow.executor = DaskExecutor(cluster_class=KubeCluster())
, is there a way for me to access that KubeCluster
instance? If I can somehow get it from within the flow I can save the content of the log somehow (the goal is to have it easily accessible from within the UI). I guess that with a resource manager it would somehow be possible but currently I am hoping to run on the same cluster that is spawned by prefect...Anna Geller
03/20/2022, 2:07 PMdistributed
repoTomer Cagan
03/21/2022, 9:14 AMKubeCluster
instance, I can possibly call get_logs()
in a consecutive task and write it to stdout so it is picked up by Prefect.. thus, I was asking about it so that I can possibly find some way to do it 🙂Anna Geller
03/21/2022, 9:48 AMKubeCluster().get_logs()
from a consecutive task won't work since the consecutive task would also run on a Dask worker. It's a known issue and I think there is no way to do it directly when using DaskExecutor
.
But if you don't use DaskExecutor
and you instead operate with Dask via resource manager and via Dask client, calling KubeCluster().get_logs()
in a resource manager cleanup task could potentially work. I'm mentioning this option because of our previous threads about it. If you try that, I would appreciate it if you could share whether this workedTomer Cagan
03/21/2022, 10:10 AMDaskExecutor
.
Hence, I assume, the compute_with_dask
task from my code is running on one of the workers and when it calls fib
, print statements from both are captured by prefect (lines with date are those captured by Prefect).
Is that's wrong?
If not, why it works for the futures (calls) that Prefect submit
directly and not the others?
I am still exploring how to use Prefect + Dask and as soon as I will - I hope to share how we used it...Anna Geller
03/21/2022, 10:21 AMTomer Cagan
03/28/2022, 8:19 AMKubeCluster
that calls get_logs
function.
I implemented this as a standalone code but similar thing can probably be done by getting the cluster from a client retrieved using worker_client
.
While this works, it does not get logs written to STDOUT. It just get logs written to the worker logger.
It is probably possible to write to the same log by calling logging.getLogger("distributed.worker")
- but for a cluster deployment, it might be better to use proper infrastructure (ELK / Loki+Promtail / Syslog) to collect / ship the logs.
I will report more as I progress with this - maybe we can define a flow to enable getting the logs - like setting it up properly and then collecting the logs inside a task at the end of the flow. This will be a dask-centric solution though...Anna Geller
03/28/2022, 9:31 AMThis will be a dask-centric solution though.That's totally valid if you use Dask with many flows. I agree that using something like the ELK stack is better for cluster deployment as it can more reliably get logs directly on the infrastructure level rather than on a flow level. I would be curious to test it out if you can share later how you did it 🙌
Tomer Cagan
03/28/2022, 10:15 AMAnna Geller
03/28/2022, 10:23 AMTomer Cagan
03/28/2022, 2:12 PMfib
function is somehow caught even though, supposedly, it runs inside a dask task...Anna Geller
03/28/2022, 2:29 PMKevin Kho
03/28/2022, 2:33 PMfib
task might be running on the Client instead of the Workers?Tomer Cagan
03/29/2022, 6:53 AM