Hi - I have one task in orion I am trying to run i...
# ask-community
i
Hi - I have one task in orion I am trying to run in a flow that returns a pandas dataframe. The function runs correctly without the task decorator. But the function returns a disambiguation error in a flow. Code in thread
Copy code
@task
def get_oup_deinitions(node_input_dict):
    
    oupDefs = oup.OUPDefinitions("en-us", creds.OUP)
    oup_df = oupDefs.run(node_input=node_input_dict)
    
    return oup_df

@flow
def get_node_resources(node_input_dict):
   
    oup_df = get_oup_deinitions(node_input_dict=node_input_dict)

    return oup_df
This is the traceback:
Copy code
16:03:19.983 | Flow run 'pumpkin-pony' encountered exception:
Traceback (most recent call last):
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 377, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 48, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "py39lmap/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 805, in run_sync_in_worker_thread
    return await future
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 743, in run
    result = func(*args)
  File "/egm_make_lmap/extract_resources/get_resources.py", line 83, in get_node_resources
    oup_df = get_oup_deinitions(
  File "py39lmap/lib/python3.9/site-packages/prefect/tasks.py", line 271, in __call__
    return enter_task_run_engine(
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 450, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 59, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "py39lmap/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
    return asynclib.run_async_from_thread(func, *args)
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 830, in run_async_from_thread
    return f.result()
  File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 510, in create_and_submit_task_run
    future = await flow_run_context.executor.submit(
  File "py39lmap/lib/python3.9/site-packages/prefect/executors.py", line 180, in submit
    self._results[task_run.id] = await run_fn(**run_kwargs)
  File "py39lmap/lib/python3.9/site-packages/prefect/client.py", line 59, in wrapper
    return await fn(*args, **kwargs)
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 620, in orchestrate_task_run
    terminal_state = await user_return_value_to_state(
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 734, in user_return_value_to_state
    if is_state(result) or is_state_iterable(result):
  File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
    if isinstance(obj, IterableABC) and obj:
  File "py39lmap/lib/python3.9/site-packages/pandas/core/generic.py", line 1537, in __nonzero__
    raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
16:03:19.989 | Shutting down executor `SequentialExecutor`...
16:03:20.066 | Flow run 'pumpkin-pony' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
Failed(message='Flow run encountered an exception.', type=FAILED)
a
@Anna Geller This looks like an issue in orion actually:
Copy code
File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
    if isinstance(obj, IterableABC) and obj:
casting obj, which in this case is a pandas dataframe, to bool causes that error
1
a
I see. In that case, it would be good to have a look at the flow code. @itay livni could you share your flow file so that I can replicate the issue?
i
Hi @Anna Geller - Below is a reproducible flow that breaks with pandas
Copy code
from prefect import flow, task

import numpy as np
import pandas as pd

@task
def pandas_task(config: dict):
    df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
    return df

@flow
def main_flow():
    # do some things
    # then call another flow function
    df = pandas_task(config={})
    # do more things

# run the flow
flow_state = main_flow()
a
Thanks for the example and for reporting that. Looks like a problem with serialization of task results. I’ll open an issue for it. As a workaround for now, if instead of returning a dataframe, you would return
<http://df.to|df.to>_dict()
, this should work.
i
@Anna Geller Thanks. FYI - I am basically porting my prefect core code to orion.
a
Gotcha. If you do it to test out Orion, then that’s great. But later, we will also provide migration guides and porting Core to Orion will be easier. Thanks again! 👍
i
Hi @Anna Geller - Sorry to be a pain here. But can you throw a short example of how you would run a method in a class with orion. I am getting all sorts of errors. FWIW: I dont mind being a guinea pig testing orion
a
@itay livni can you share those errors? You can call arbitrary code in Orion, if you look for some class scaffold:
Copy code
from prefect import flow, task


class SomeDataProcessor:
    def __init__(self):
        pass

    def run_etl(self):
        pass


@task
def some_task():
    SomeDataProcessor().run_etl()


@flow
def main():
    some_task()


if __name__ == "__main__":
    main()
i
Ah - So you need a helper function with
@task
. I was hoping to get rid of those.
a
You could call it directly in the flow if you prefer:
Copy code
@flow
def main():
    SomeDataProcessor().run_etl()
The trade-off here is that the more tasks you have, the more fine-granular visibility you have when something fails.
✔️ 1
z
This is definitely an issue with the way we’re checking if the return value is a state
We can fix this by changing our evaluation. I do hate that pandas raises an exception for truth checks though -.-
👍 2
i
Hi @Anna Geller - Is there a place to follow orion updates? Basically I want to take up migrating my flow again when the pandas serialization bug is fixed. Here is my unsolicited feedback migrating from prefect core to orion (so far) for a hobby project. 1. Tasks that were previous classes require more code because there is no way to run
SomeClass(Task)
without making a helper function (I could be missing something) 2. I enjoy not having to register flow to get to a UI. 3. Overall less template code i.e. flow.visualize, flow.register ... 4. Radar is cool. I cannot wait to see how the subflows and recursive nature of my flow is represented. 5. The new graph showing flow runs is less intuitive to understand than the current simple bar chart. But I imagine as I get used to it things will change.
🙏 2
👀 3
a
Thank you so much for your feedback! I will direct it to the team. Regarding updates, we usually post updates about new releases in the announcement channel.
upvote 1
n
Hi @itay livni - on your flow run graph point; can you expand further on that? I’d be curious to hear how you’re currently interacting with that chart and what functionality you’re expecting/would like to see there. There are some notable differences including grouping flow runs on time buckets for a more consistent experience (e.g. if you’re looking at last month’s runs vs. this month’s runs you should have the same number of bars, whereas in the current chart you couldn’t choose a time interval because it’s based on a fixed number of runs (100 or so))
i
Hi @nicholas - Firstly, I use case with prefect is limited to a "couple" of flows for personal use with an eye to production. In short take my use case with a heavy grain of salt. I basically quickly glance at Flow Run Graph (And like how they are not grouped, IMO this can be a view?) -> Look if a flow took longer/ above average,.. to run -> Click on any bar/flow that did not run or is of interest because of time -> Analyze -> Fix/refactor if needed. I think you hit on the head. Not used to looking at flows grouped. Hope this helps. Let me know to if this needs more clarification or you want more feedback on the UI. i.e. Radar takes one to many clicks to get view 🙂
n
Thanks @itay livni - this is helpful 🙂