https://prefect.io logo
Title
p

Pierre LIBAULT

09/07/2022, 2:22 AM
Hi everyone, I am running a flow on several computer using prefect_dask and prefect cloud, however every computers that receives tasks send me this exeption :
2022-09-07 11:17:55,962 - distributed.worker - WARNING - Compute Failed
Key:       6b8ff000-ad9a-4de0-ac5a-0b9c35c93fe0
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x7ff4aaaff760>, 'task_run': TaskRun(id=UUID('44d28b18-a961-43a8-9f5f-e02134767737'), name='say_goodbye-261e56a8-0', flow_run_id=UUID('16b3db98-966b-44b6-b73b-c4ff4682c878'), task_key='__main__.say_goodbye', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0), tags=[], state_id=UUID('4543c5be-881c-47e5-9464-4ef76eef5130'), task_inputs={'name': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 9, 7, 2, 17, 54, 691980, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=56332), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {'name': 'arthur'}, 'wait_for': None, 'result_filesyste':
Exception: "PermissionError(13, 'Permission denied')"
Does everyone know how I can remove this PermissionError ? I feel that it tries to modify a file or something but I am not sure..
c

Christopher Boyd

09/07/2022, 12:51 PM
What is the task doing?
'result_filesyste':
Exception: "PermissionError(13, 'Permission denied')"
looks like it’s getting a filesystem error - perhaps it’s trying to access a file local to your system instead of in memory through an object/variable?
What file is being used? What are the permissions on it? Is the dask cluster a local one, or remote?
p

Pierre LIBAULT

09/08/2022, 1:40 AM
I am trying to run this simple code I found on a prefect example :
from dask.distributed import Client
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def say_hello(name):
    print(f"hello {name}")

@task
def say_goodbye(name):
    print(f"goodbye {name}")

@flow(task_runner=DaskTaskRunner(address="<tcp://X.X.X.X>:X"))
def greetings(names):
    for name in names:
        say_hello.submit(name)
        say_goodbye.submit(name)
    return name

if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin", "John", "William"])
So it shouldn't get acces to something, however it might try to get access to
PREFECT_LOCAL_STORAGE_PATH='${PREFECT_HOME}/storage'
But I have changed the permissions and it still doesn't work
The cluster is made of several different computer that you can connect via ssh
I have created a common storage path for every workers I have by modifying
PREFECT_LOCAL_STORAGE_PATH
and it worked now