https://prefect.io logo
Title
k

kushagra kumar

05/10/2022, 8:01 AM
Newbie ALERT: Trying to run a basic regression model using
prefect 2.0
. Facing below error:
Traceback (most recent call last):
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "car_linearregression.py", line 104, in do_regression
    X,y = get_feat_and_target(df_car,target)
TypeError: cannot unpack non-iterable PrefectFuture object
It's a simple serial execution where a
flow
function calls different
Task
functions serially. very similar to the below tutorial on the official website.
import requests
from prefect import flow, task

@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@task
def parse_fact(response):
    print(response["fact"])
    return 

@flow
def api_flow(url):
    fact_json = call_api(url)
    parse_fact(fact_json)
    return
So far I have tried creating a new virtual env and install minimal packages required to run the ML model but had no luck. Could you please help me with this.
āœ… 1
a

Anna Geller

05/10/2022, 11:08 AM
Can you share
prefect version
output?
I couldn't reproduce it - can you try this example?
import requests
from prefect import flow, task


@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()


@task
def parse_fact(response):
    print(response["BTC"])
    return


@flow
def api_flow(url):
    fact_json = call_api(url)
    parse_fact(fact_json)


if __name__ == "__main__":
    api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
it gives me:
13:13:57.430 | INFO    | prefect.engine - Created flow run 'jasper-groundhog' for flow 'api-flow'
13:13:57.430 | INFO    | Flow run 'jasper-groundhog' - Using task runner 'ConcurrentTaskRunner'
13:13:57.435 | WARNING | Flow run 'jasper-groundhog' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:13:57.457 | INFO    | Flow run 'jasper-groundhog' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:13:57.475 | INFO    | Flow run 'jasper-groundhog' - Created task run 'parse_fact-b0346046-0' for task 'parse_fact'
200
13:13:57.709 | INFO    | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:13:57.725 | INFO    | Task run 'parse_fact-b0346046-0' - Finished in state Completed()
{'USD': 31412.32}
13:13:57.732 | INFO    | Flow run 'jasper-groundhog' - Finished in state Completed('All states completed.')
k

kushagra kumar

05/10/2022, 11:17 AM
prefect --version
2.0b3
The tutorial examples works well on my machine. It seems when I try to return multiple variables from the function, prefect throws an error.
import requests
from prefect import flow, task


@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json(), response.status_code


@task
def parse_fact(response):
    print(response["BTC"])
    return


@flow
def api_flow(url):
    fact_json, code  = call_api(url)
    parse_fact(fact_json)


if __name__ == "__main__":
    api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
Error:
13:19:38.116 | INFO    | prefect.engine - Created flow run 'daffy-wildcat' for flow 'api-flow'
13:19:38.116 | INFO    | Flow run 'daffy-wildcat' - Using task runner 'ConcurrentTaskRunner'
13:19:38.142 | WARNING | Flow run 'daffy-wildcat' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:19:38.265 | INFO    | Flow run 'daffy-wildcat' - Created task run 'call_api-190c7484-0' for task 'call_api'
13:19:38.286 | ERROR   | Flow run 'daffy-wildcat' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "test_slackeg.py", line 20, in api_flow
    fact_json, code  = call_api(url)
TypeError: cannot unpack non-iterable PrefectFuture object
200
13:19:38.650 | INFO    | Task run 'call_api-190c7484-0' - Finished in state Completed()
13:19:38.650 | ERROR   | Flow run 'daffy-wildcat' - Finished in state Failed('Flow run encountered an exception.')
In My regression POC example too I am trying to return two variables from function.
a

Anna Geller

05/10/2022, 11:36 AM
Try:
fact_json, code  = call_api(url).result()
this will work. The problem is that you return multiple objects in a task, but the task itself returns a PrefectFuture. Only by retrieving
.result()
of it you get the actual returned data To assign that to those 2 objects
fact_json
and
code
, this would need to be the actual data, which you can get by adding
.result()
Otherwise, you try to unpack PrefectFuture to those two objects, which won't work because: ā€¢
call_api(URL)
returns a single object - PrefectFuture ā€¢
call_api(url).result()
returns the actual data Here is a working example:
import requests
from prefect import flow, task


@task
def call_api(url):
    response = requests.get(url)
    return response.json(), response.status_code


@task
def parse_json(response):
    print(response["BTC"])


@task
def parse_code(code):
    print(code)


@flow
def api_flow(url):
    resp_json, resp_code = call_api(url).result()
    parse_json(resp_json)
    parse_code(resp_code)


if __name__ == "__main__":
    api_flow("<https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC&tsyms=USD>")
šŸ‘ 1
k

kushagra kumar

05/10/2022, 11:57 AM
Many Thanks for the detailed explanation @Anna Geller!! šŸ™‚ . It worked.
šŸ‘ 1