kushagra kumar
05/10/2022, 8:01 AMprefect 2.0
. Facing below error:
Traceback (most recent call last):
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "car_linearregression.py", line 104, in do_regression
X,y = get_feat_and_target(df_car,target)
TypeError: cannot unpack non-iterable PrefectFuture object
It's a simple serial execution where a flow
function calls different Task
functions serially. very similar to the below tutorial on the official website.
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task
def parse_fact(response):
print(response["fact"])
return
@flow
def api_flow(url):
fact_json = call_api(url)
parse_fact(fact_json)
return
So far I have tried creating a new virtual env and install minimal packages required to run the ML model but had no luck. Could you please help me with this.Anna Geller
05/10/2022, 11:08 AMprefect version
output?import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task
def parse_fact(response):
print(response["BTC"])
return
@flow
def api_flow(url):
fact_json = call_api(url)
parse_fact(fact_json)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
it gives me:
13:13:57.430 | INFO | prefect.engine - Created flow run 'jasper-groundhog' for flow 'api-flow'
13:13:57.430 | INFO | Flow run 'jasper-groundhog' - Using task runner 'ConcurrentTaskRunner'
13:13:57.435 | WARNING | Flow run 'jasper-groundhog' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:13:57.457 | INFO | Flow run 'jasper-groundhog' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:13:57.475 | INFO | Flow run 'jasper-groundhog' - Created task run 'parse_fact-b0346046-0' for task 'parse_fact'
200
13:13:57.709 | INFO | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:13:57.725 | INFO | Task run 'parse_fact-b0346046-0' - Finished in state Completed()
{'USD': 31412.32}
13:13:57.732 | INFO | Flow run 'jasper-groundhog' - Finished in state Completed('All states completed.')
kushagra kumar
05/10/2022, 11:17 AMprefect --version
2.0b3
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json(), response.status_code
@task
def parse_fact(response):
print(response["BTC"])
return
@flow
def api_flow(url):
fact_json, code = call_api(url)
parse_fact(fact_json)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
Error:
13:19:38.116 | INFO | prefect.engine - Created flow run 'daffy-wildcat' for flow 'api-flow'
13:19:38.116 | INFO | Flow run 'daffy-wildcat' - Using task runner 'ConcurrentTaskRunner'
13:19:38.142 | WARNING | Flow run 'daffy-wildcat' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:19:38.265 | INFO | Flow run 'daffy-wildcat' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:19:38.286 | ERROR | Flow run 'daffy-wildcat' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "test_slackeg.py", line 20, in api_flow
fact_json, code = call_api(url)
TypeError: cannot unpack non-iterable PrefectFuture object
200
13:19:38.650 | INFO | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:19:38.650 | ERROR | Flow run 'daffy-wildcat' - Finished in state Failed('Flow run encountered an exception.')
In My regression POC example too I am trying to return two variables from function.Anna Geller
05/10/2022, 11:36 AMfact_json, code = call_api(url).result()
this will work.
The problem is that you return multiple objects in a task, but the task itself returns a PrefectFuture. Only by retrieving .result()
of it you get the actual returned data
To assign that to those 2 objects fact_json
and code
, this would need to be the actual data, which you can get by adding .result()
Otherwise, you try to unpack PrefectFuture to those two objects, which won't work because:
⢠call_api(URL)
returns a single object - PrefectFuture
⢠call_api(url).result()
returns the actual data
Here is a working example:
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
return response.json(), response.status_code
@task
def parse_json(response):
print(response["BTC"])
@task
def parse_code(code):
print(code)
@flow
def api_flow(url):
resp_json, resp_code = call_api(url).result()
parse_json(resp_json)
parse_code(resp_code)
if __name__ == "__main__":
api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
kushagra kumar
05/10/2022, 11:57 AM