Guilherme Petris
12/07/2021, 11:18 AMfrom prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import time
@task
def extract_reference_data():
time.sleep(10)
return 'hej'
@task
def extract_live_data(input):
time.sleep(10)
return f'{input}hejdå'
@task
def separate_task():
time.sleep(10)
return 'hoppsan'
with Flow("Aircraft-ETL",
executor=LocalDaskExecutor()) as flow:
reference_data = extract_reference_data()
live_data = extract_live_data(reference_data)
separate_task()
flow.run()
# flow.visualize()
Guilherme Petris
12/07/2021, 11:18 AM>>> from prefect.executors import LocalDaskExecutor
>>> import time
>>> @task
...
File "<stdin>", line 2
^
SyntaxError: invalid syntax
>>> def extract_reference_data():
... time.sleep(10)
... return 'hej'
... @task
File "<stdin>", line 4
@task
^
SyntaxError: invalid syntax
>>>
>>> def extract_live_data(input):
... time.sleep(10)
... return f'{input}hejdå'
... @task
File "<stdin>", line 4
@task
^
SyntaxError: invalid syntax
>>>
>>> def separate_task():
... time.sleep(10)
... return 'hoppsan'
...
>>> with Flow("Aircraft-ETL",
... executor=LocalDaskExecutor()) as flow:
... reference_data = extract_reference_data()
... live_data = extract_live_data(reference_data)
... separate_task()
...
Traceback (most recent call last):
File "<stdin>", line 3, in <module>
NameError: name 'extract_reference_data' is not defined
>>> flow.run()
[2021-12-07 12:08:44+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Aircraft-ETL'
[2021-12-07 12:08:44+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
>>> # flow.visualize()
Anna Geller
executor=LocalDaskExecutor()
unless you use mapping and you want to run things in parallel.
Can you try run this script at once? 🙂 The problem you have is that you ran it somehow out of order in your iPythonAnna Geller
separate_task(upstream_tasks=[live_data])
to set dependenciesGuilherme Petris
12/07/2021, 11:29 AMNameError: name 'extract_reference_data' is not defined
Anna Geller
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def extract_reference_data():
return "hej"
@task
def extract_live_data(input):
return f"{input}hejdå"
@task
def separate_task():
return "hoppsan"
with Flow("Aircraft-ETL") as flow:
reference_data = extract_reference_data()
live_data = extract_live_data(reference_data)
separate_task(upstream_tasks=[live_data])
if __name__ == "__main__":
flow.run()
and run:
python flow.py
Guilherme Petris
12/07/2021, 11:33 AMGuilherme Petris
12/07/2021, 11:34 AMGuilherme Petris
12/07/2021, 11:34 AMAnna Geller
Guilherme Petris
12/07/2021, 1:06 PM