kushagra kumar
05/10/2022, 8:01 AMprefect 2.0
. Facing below error:
Traceback (most recent call last):
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "car_linearregression.py", line 104, in do_regression
X,y = get_feat_and_target(df_car,target)
TypeError: cannot unpack non-iterable PrefectFuture object
It's a simple serial execution where a flow
function calls different Task
functions serially. very similar to the below tutorial on the official website.
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task
def parse_fact(response):
print(response["fact"])
return
@flow
def api_flow(url):
fact_json = call_api(url)
parse_fact(fact_json)
return
So far I have tried creating a new virtual env and install minimal packages required to run the ML model but had no luck. Could you please help me with this.Anna Geller
prefect version
output?import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task
def parse_fact(response):
print(response["BTC"])
return
@flow
def api_flow(url):
fact_json = call_api(url)
parse_fact(fact_json)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
it gives me:
13:13:57.430 | INFO | prefect.engine - Created flow run 'jasper-groundhog' for flow 'api-flow'
13:13:57.430 | INFO | Flow run 'jasper-groundhog' - Using task runner 'ConcurrentTaskRunner'
13:13:57.435 | WARNING | Flow run 'jasper-groundhog' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:13:57.457 | INFO | Flow run 'jasper-groundhog' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:13:57.475 | INFO | Flow run 'jasper-groundhog' - Created task run 'parse_fact-b0346046-0' for task 'parse_fact'
200
13:13:57.709 | INFO | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:13:57.725 | INFO | Task run 'parse_fact-b0346046-0' - Finished in state Completed()
{'USD': 31412.32}
13:13:57.732 | INFO | Flow run 'jasper-groundhog' - Finished in state Completed('All states completed.')
kushagra kumar
05/10/2022, 11:17 AMprefect --version
2.0b3
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json(), response.status_code
@task
def parse_fact(response):
print(response["BTC"])
return
@flow
def api_flow(url):
fact_json, code = call_api(url)
parse_fact(fact_json)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
Error:
13:19:38.116 | INFO | prefect.engine - Created flow run 'daffy-wildcat' for flow 'api-flow'
13:19:38.116 | INFO | Flow run 'daffy-wildcat' - Using task runner 'ConcurrentTaskRunner'
13:19:38.142 | WARNING | Flow run 'daffy-wildcat' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:19:38.265 | INFO | Flow run 'daffy-wildcat' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:19:38.286 | ERROR | Flow run 'daffy-wildcat' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "test_slackeg.py", line 20, in api_flow
fact_json, code = call_api(url)
TypeError: cannot unpack non-iterable PrefectFuture object
200
13:19:38.650 | INFO | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:19:38.650 | ERROR | Flow run 'daffy-wildcat' - Finished in state Failed('Flow run encountered an exception.')
In My regression POC example too I am trying to return two variables from function.Anna Geller
fact_json, code = call_api(url).result()
this will work.
The problem is that you return multiple objects in a task, but the task itself returns a PrefectFuture. Only by retrieving .result()
of it you get the actual returned data
To assign that to those 2 objects fact_json
and code
, this would need to be the actual data, which you can get by adding .result()
Otherwise, you try to unpack PrefectFuture to those two objects, which won't work because:
⢠call_api(URL)
returns a single object - PrefectFuture
⢠call_api(url).result()
returns the actual data
Here is a working example:
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
return response.json(), response.status_code
@task
def parse_json(response):
print(response["BTC"])
@task
def parse_code(code):
print(code)
@flow
def api_flow(url):
resp_json, resp_code = call_api(url).result()
parse_json(resp_json)
parse_code(resp_code)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
kushagra kumar
05/10/2022, 11:57 AM