https://prefect.io logo
Title
g

Guilherme Petris

12/07/2021, 11:18 AM
Hi! I was troubleshooting some of the things that i’m trying to run on prefect and even with a simple flow i’m getting some errors. I sat down with a consultant that wrote a test script and we figure it out that it’s not running on my end only. Already tried to reinstall everything but i’m still getting the same issue. Here is the test script - i’m just running this on my IDE
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import time


@task
def extract_reference_data():
    time.sleep(10)
    return 'hej'


@task
def extract_live_data(input):
    time.sleep(10)
    return f'{input}hejdå'


@task
def separate_task():
    time.sleep(10)
    return 'hoppsan'

    
with Flow("Aircraft-ETL",
          executor=LocalDaskExecutor()) as flow:
    reference_data = extract_reference_data()
    live_data = extract_live_data(reference_data)
    separate_task()

flow.run()
# flow.visualize()
>>> from prefect.executors import LocalDaskExecutor
>>> import time
>>> @task
... 
  File "<stdin>", line 2
    
    ^
SyntaxError: invalid syntax
>>> def extract_reference_data():
...     time.sleep(10)
...     return 'hej'
... @task
  File "<stdin>", line 4
    @task
    ^
SyntaxError: invalid syntax
>>> 
>>> def extract_live_data(input):
...     time.sleep(10)
...     return f'{input}hejdå'
... @task
  File "<stdin>", line 4
    @task
    ^
SyntaxError: invalid syntax
>>> 
>>> def separate_task():
...     time.sleep(10)
...     return 'hoppsan'
... 
>>> with Flow("Aircraft-ETL",
...           executor=LocalDaskExecutor()) as flow:
...     reference_data = extract_reference_data()
...     live_data = extract_live_data(reference_data)
...     separate_task()
... 
Traceback (most recent call last):
  File "<stdin>", line 3, in <module>
NameError: name 'extract_reference_data' is not defined
>>> flow.run()
[2021-12-07 12:08:44+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Aircraft-ETL'
[2021-12-07 12:08:44+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
>>> # flow.visualize()
a

Anna Geller

12/07/2021, 11:25 AM
@Guilherme Petris you don’t need to set
executor=LocalDaskExecutor()
unless you use mapping and you want to run things in parallel. Can you try run this script at once? 🙂 The problem you have is that you ran it somehow out of order in your iPython
also, you can set the last task as:
separate_task(upstream_tasks=[live_data])
to set dependencies
g

Guilherme Petris

12/07/2021, 11:29 AM
i’m getting the same error :
NameError: name 'extract_reference_data' is not defined
a

Anna Geller

12/07/2021, 11:30 AM
How about this then? 🙂 Store this as flow.py
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


@task
def extract_reference_data():
    return "hej"


@task
def extract_live_data(input):
    return f"{input}hejdå"


@task
def separate_task():
    return "hoppsan"


with Flow("Aircraft-ETL") as flow:
    reference_data = extract_reference_data()
    live_data = extract_live_data(reference_data)
    separate_task(upstream_tasks=[live_data])

if __name__ == "__main__":
    flow.run()
and run:
python flow.py
1
g

Guilherme Petris

12/07/2021, 11:33 AM
Ok, apparently running with the terminal works
🙌 1
Thats weird, maybe something with my IDE?
I select and run with a shortcut basically
a

Anna Geller

12/07/2021, 11:39 AM
🤷
g

Guilherme Petris

12/07/2021, 1:06 PM
Well, thank you again! 😁