https://prefect.io logo
j

John Kang

09/23/2022, 1:47 PM
I'm having a task fail when running on an agent within a flow. The task takes the current working directory and uploads that as a string value into a string block. Any idea why this could be? This started when I updated to 2.4.1
1
Error is: 093033.788 | ERROR | Task run 'Register Current Prefect flow run temporary directory-2e345798-0' - Finished in state Failed('Task run encountered an exception.') 093033.788 | ERROR | Flow run 'nice-cockatoo' - Encountered exception during execution: Traceback (most recent call last): File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 596, in orchestrate_flow_run result = await run_sync(flow_call) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 57, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 754, in run result = context.run(func, *args) File "main_python_files\w_wrapper_update_data.py", line 342, in wrapper_data_update_function change_previous_temporary_directory(current_working_directory) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\tasks.py", line 295, in call return enter_task_run_engine( File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 736, in enter_task_run_engine return run_async_from_worker_thread(begin_run) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 137, in run_async_from_worker_thread return anyio.from_thread.run(call) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\from_thread.py", line 35, in run return asynclib.run_async_from_thread(func, *args) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 847, in run_async_from_thread return f.result() File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\concurrent\futures\_base.py", line 444, in result return self.__get_result() File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\concurrent\futures\_base.py", line 389, in __get_result raise self._exception File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 874, in get_task_call_return_value return await future._result() File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\futures.py", line 237, in _result return final_state.result(raise_on_failure=raise_on_failure) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\orion\schemas\states.py", line 145, in result raise data File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 1221, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 57, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 754, in run result = context.run(func, *args) File "C:\Users\jkang1\AppData\Local\Temp\tmpn0w8m9xiprefect\main_python_files\gcs\register_prefect_temp_folder.py", line 24, in change_previous_temporary_directory change_previous_temporary_directory_block.save( File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 208, in wrapper return run_async_from_worker_thread(async_fn, *args, **kwargs) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 137, in run_async_from_worker_thread return anyio.from_thread.run(call) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\from_thread.py", line 35, in run return asynclib.run_async_from_thread(func, *args) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 847, in run_async_from_thread return f.result() File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\concurrent\futures\_base.py", line 444, in result return self.__get_result() File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\concurrent\futures\_base.py", line 389, in __get_result raise self._exception File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\blocks\core.py", line 813, in save document_id = await self._save(name=name, overwrite=overwrite, client=client) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\client\orion.py", line 80, in with_injected_client return await fn(*args, **kwargs) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\blocks\core.py", line 761, in _save await self.register_type_and_schema(client=client) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\client\orion.py", line 80, in with_injected_client return await fn(*args, **kwargs) File "C:\Users\jkang1\.conda\envs\Capacity_venv\lib\site-packages\prefect\blocks\core.py", line 685, in register_type_and_schema client = client or client_from_context NameError: name 'client_from_context' is not defined
When I run this task separately outside of a flow and not in an agent it works fine
Copy code
@task(
    name="Register Current Prefect flow run temporary directory",
    description="Registers the directory that is housing the current prefect flow run temporary directory",
    retries=3,
    retry_delay_seconds=10,
)
def change_previous_temporary_directory(working_directory: str) -> None:
    """Change the temporary directory of the last prefect flow run"""

    change_previous_temporary_directory_block = String(value=working_directory)
    change_previous_temporary_directory_block.save(
        name="previous-prefect-flow-directory", overwrite=True
    )
    # Print out the value of the temporary directory
    last_temporary_directory = String.load("previous-prefect-flow-directory").value
    print(f"Prefect last temporary directory {last_temporary_directory}")
The code for the task is really simple... Not sure what could be going on
a

alex

09/23/2022, 2:29 PM
Hey @John Kang! This looks like it could be a bug. Can you open an issue in GitHub so that we can investigate this further?
j

John Kang

09/23/2022, 2:29 PM
@alex Sure thing! Thanks for the heads up.
a

alex

09/23/2022, 4:04 PM
Hey @John Kang, we’ve released version 2.4.2 of Prefect to fix this bug. Let us know if you have any other issues!
j

John Kang

09/23/2022, 4:06 PM
@alex Great! Thanks so much for the quick response!!! I'll install once my flow runs are complete.
5 Views