Slackbot
11/24/2021, 5:22 PMAnna Geller
Jason Motley
11/24/2021, 5:25 PMJason Motley
11/24/2021, 5:26 PMAnna Geller
with Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task(c_inputs, upstream_tasks=[a,b])
Method 2:
with Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task()
c.set_upstream(b)
c.set_upstream(a)
Jason Motley
11/24/2021, 5:27 PMJason Motley
11/24/2021, 5:28 PMJason Motley
11/24/2021, 5:30 PMextract_1 = task1(stuff)
load_1(connection=conn, data=extract_1
extract_2 = task2(stuff)
transformed = task3(stuff)
delete_stuff() # This works
load_2(connection=stuconnff, data=transformed)
rewrite as:
extract_1 = task1(stuff)
load_1(connection=conn, data=extract_1, upstream_tasks = [extract_1]
extract_2 = task2(stuff, upstream_tasks = [extract_1, load_1])
etc.
Jason Motley
11/24/2021, 5:30 PMAnna Geller
from prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
flow.visualize()
Jason Motley
11/24/2021, 5:31 PMJason Motley
11/24/2021, 5:31 PMAnna Geller
Jason Motley
11/24/2021, 5:33 PM[]
?Jason Motley
11/24/2021, 5:33 PMAnna Geller
Jason Motley
11/24/2021, 5:34 PMJason Motley
11/24/2021, 5:34 PMAnna Geller
Jason Motley
11/24/2021, 5:34 PMAnna Geller
Anna Geller
from prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
flow.visualize()
Jason Motley
11/24/2021, 5:36 PMJason Motley
11/24/2021, 5:36 PMAnna Geller
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[a,b])
flow.visualize()
Anna Geller
Jason Motley
11/24/2021, 5:38 PMJason Motley
11/24/2021, 5:49 PMJason Motley
11/24/2021, 5:49 PMload(connection=connection_w, if_exists='append', ## Load new table
db_table='DataTable',
dataframe=new_transformed,
upstream_tasks=[previous_task])
Jason Motley
11/24/2021, 5:49 PMr("load() missing 4 required positional arguments: 'connection', 'if_exists', 'db_table', and 'dataframe'")
Anna Geller
with Flow()
constructor you can only have tasks - are you sure that the load function is decorated with @task? It’s described here: https://docs.prefect.io/core/concepts/tasks.htmlJason Motley
11/24/2021, 6:57 PMJason Motley
11/24/2021, 6:57 PMJason Motley
11/24/2021, 6:57 PMFailed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
Anna Geller
Jason Motley
11/24/2021, 6:59 PMAnna Geller
Anna Geller
import pandas as pd
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
@task
def load(df, connection_string, db_table="table", schema="schema"):
engine = create_engine(connection_string)
df.to_sql(db_table, schema=schema, con=engine, index=False)
@task
def get_df():
return pd.DataFrame()
with Flow("ex") as flow:
db_conn = PrefectSecret("DB_CONNECTION_STRING")
df = get_df()
load(df, db_conn)