itay livni
11/27/2021, 9:37 PMitay livni
11/27/2021, 9:38 PM@task
def get_oup_deinitions(node_input_dict):
oupDefs = oup.OUPDefinitions("en-us", creds.OUP)
oup_df = oupDefs.run(node_input=node_input_dict)
return oup_df
@flow
def get_node_resources(node_input_dict):
oup_df = get_oup_deinitions(node_input_dict=node_input_dict)
return oup_df
This is the traceback:
16:03:19.983 | Flow run 'pumpkin-pony' encountered exception:
Traceback (most recent call last):
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 377, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 48, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "py39lmap/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 805, in run_sync_in_worker_thread
return await future
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 743, in run
result = func(*args)
File "/egm_make_lmap/extract_resources/get_resources.py", line 83, in get_node_resources
oup_df = get_oup_deinitions(
File "py39lmap/lib/python3.9/site-packages/prefect/tasks.py", line 271, in __call__
return enter_task_run_engine(
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 450, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 59, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "py39lmap/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
return asynclib.run_async_from_thread(func, *args)
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 830, in run_async_from_thread
return f.result()
File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 510, in create_and_submit_task_run
future = await flow_run_context.executor.submit(
File "py39lmap/lib/python3.9/site-packages/prefect/executors.py", line 180, in submit
self._results[task_run.id] = await run_fn(**run_kwargs)
File "py39lmap/lib/python3.9/site-packages/prefect/client.py", line 59, in wrapper
return await fn(*args, **kwargs)
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 620, in orchestrate_task_run
terminal_state = await user_return_value_to_state(
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 734, in user_return_value_to_state
if is_state(result) or is_state_iterable(result):
File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
if isinstance(obj, IterableABC) and obj:
File "py39lmap/lib/python3.9/site-packages/pandas/core/generic.py", line 1537, in __nonzero__
raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
16:03:19.989 | Shutting down executor `SequentialExecutor`...
16:03:20.066 | Flow run 'pumpkin-pony' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
Failed(message='Flow run encountered an exception.', type=FAILED)
Anna Geller
Anurag Bajpai
11/28/2021, 11:45 AMFile "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
if isinstance(obj, IterableABC) and obj:
casting obj, which in this case is a pandas dataframe, to bool causes that errorAnna Geller
itay livni
11/28/2021, 2:52 PMfrom prefect import flow, task
import numpy as np
import pandas as pd
@task
def pandas_task(config: dict):
df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
return df
@flow
def main_flow():
# do some things
# then call another flow function
df = pandas_task(config={})
# do more things
# run the flow
flow_state = main_flow()
Anna Geller
<http://df.to|df.to>_dict()
, this should work.itay livni
11/28/2021, 3:05 PMAnna Geller
itay livni
11/28/2021, 3:54 PMAnna Geller
from prefect import flow, task
class SomeDataProcessor:
def __init__(self):
pass
def run_etl(self):
pass
@task
def some_task():
SomeDataProcessor().run_etl()
@flow
def main():
some_task()
if __name__ == "__main__":
main()
itay livni
11/28/2021, 4:22 PM@task
. I was hoping to get rid of those.Anna Geller
@flow
def main():
SomeDataProcessor().run_etl()
The trade-off here is that the more tasks you have, the more fine-granular visibility you have when something fails.Zanie
Zanie
itay livni
11/28/2021, 11:34 PMSomeClass(Task)
without making a helper function (I could be missing something)
2. I enjoy not having to register flow to get to a UI.
3. Overall less template code i.e. flow.visualize, flow.register ...
4. Radar is cool. I cannot wait to see how the subflows and recursive nature of my flow is represented.
5. The new graph showing flow runs is less intuitive to understand than the current simple bar chart. But I imagine as I get used to it things will change.Anna Geller
nicholas
itay livni
11/30/2021, 12:10 AMnicholas