https://prefect.io logo
Title
v

vholmer

11/15/2022, 5:00 PM
Hello! I have a flow that runs 10 tasks using a ray task runner. After these tasks all individually finish, the flow crashes with the following error:
Crash detected! Request to <https://api.prefect.cloud/api/accounts/CENSORED/workspaces/CENSORED/flow_runs/CENSORED/set_state> failed: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 195, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
  File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
    data = await self.transport_stream.receive()
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
asyncio.exceptions.CancelledError
The URL points to the flow's own id. What's going on here? Did anyone else experience this?
k

Kalise Richmond

11/15/2022, 9:50 PM
Hey @vholmer are you able to share a snippet of code that you are calling here?
v

vholmer

11/16/2022, 8:20 AM
Absolutely!
@flow(
    name=FLOW_NAME,
    task_runner=RayTaskRunner(address="ray://<IP ADDRESS>:10001"),
    result_storage=f"azure/azure-{os.environ['ENV']}"
)
def default(loglevel: str = "DEBUG", entity: str = None, batches: List[Tuple[int, int]] = None, metadata_dict: dict = None):
    logconfig = setup_logging.submit(loglevel=loglevel)
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting!")
    task_runs = []
    for batch in batches:
        batch_start = batch[0]
        batch_end = batch[1]
        task_runs.append(my_task.submit(entity, batch_start, batch_end, metadata_dict, wait_for=[logconfig]))

    print_done(wait_for=task_runs)
The final print_done line is just running a dummy task to force a wait for the task_runs array. It doesn't really do anything, I just tried to run this both with and without that. It still doesn't work of course. :(
k

Kalise Richmond

11/17/2022, 9:55 PM
Oh interesting! 🧐 Thank's for opening up the issue, looks like our team is taking a look and would love some debug level logs if you have them