This message was deleted.
# ask-community
s
This message was deleted.
a
Normally the edges are created for you behind the scenes. You shouldn’t have to worry about this, as long as you set dependencies either by passing data between tasks or setting explicit state dependencies. LMK if you need some examples.
j
Could you go into the explicit state dependencies a little more?
I'm looking for a way to make the flow diagram not so convoluted
a
Sure! Method 1:
Copy code
with Flow(...) as flow:
    a = first_task()
    b = second_task()

    c = third_task(c_inputs, upstream_tasks=[a,b])
Method 2:
Copy code
with Flow(...) as flow:
     a = first_task()
     b = second_task()

     c = third_task()
     c.set_upstream(b)
     c.set_upstream(a)
j
for method 1, would the flow diagram show A=> B => C?
Let me show my current example (simplified) and make sure I have it right
👍 1
Copy code
extract_1 = task1(stuff)
    load_1(connection=conn, data=extract_1

    extract_2 = task2(stuff)
    transformed = task3(stuff)
    delete_stuff() # This works
    load_2(connection=stuconnff, data=transformed)

    rewrite as:
    extract_1 = task1(stuff)
    load_1(connection=conn, data=extract_1, upstream_tasks = [extract_1]
    
    extract_2 = task2(stuff, upstream_tasks = [extract_1, load_1])

    etc.
i.e. do I keep adding the tasks inside upstream_tasks = [a, b, c] as I go? or do I write it all at the end?
a
this would be a->b->c with method 1:
Copy code
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])

flow.visualize()
j
ahh
so pass each task into upstream_tasks one at a time'
a
correct, if the sequential order is important
j
and is it the result of each task that goes in the
[]
?
E.x.
a
compare those two:
j
right, I want left
that made no sense sorry -- The one on the left is preferable.
a
it only depends on what dependencies must be met before the third task can run: if the order of task 1 and 2 doesn’t matter, then the picture on the right makes more sense
j
If I put first_task and second_task (or their results) into the upstream_tasks, that would give m the LEFT visual?
a
sure! and one more thing: I see in your inputs that the load task expects a database connection as input - this won’t work, since data that you pass between tasks must be serializable with pickle.
the left visual:
Copy code
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])

flow.visualize()
j
Great, i'll check it out, thank you!
IS there a readme on the pickle error?
a
the right visual:
Copy code
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[a,b])

flow.visualize()
if you want to pass a database connection between various tasks, you can use Resource manager, as discussed here: https://prefect-community.slack.com/archives/CL09KU1K7/p1637249773390000?thread_ts=1637249695.389900&cid=CL09KU1K7 Regarding serialization, this can be helpful: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage
j
Phenomenal, thank you!
🙌 1
@Anna Geller I got an error where adding the upstream_tasks to a flow seems to negate the reading of other features of the task such as the connections
Here is load in the task flow:
Copy code
load(connection=connection_w, if_exists='append',  ## Load new table
    db_table='DataTable', 
    dataframe=new_transformed,
    upstream_tasks=[previous_task])
Error:
r("load() missing 4 required positional arguments: 'connection', 'if_exists', 'db_table', and 'dataframe'")
a
@Jason Motley within the
with Flow()
constructor you can only have tasks - are you sure that the load function is decorated with @task? It’s described here: https://docs.prefect.io/core/concepts/tasks.html
j
yeah it is a @task
Do you know why I would get a credentials error in production if it works using my local agent?
Failed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
a
I could say more by looking at the flow definition - can you share it?
j
i.e. the whole flow section?
a
ideally the entire flow file, yes. If you don’t wanna share publicly, sending it via DM works for me too
resolved via DM by moving the db connection into the load task, and passing secret as data dependency
Copy code
import pandas as pd
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret


@task
def load(df, connection_string, db_table="table", schema="schema"):
    engine = create_engine(connection_string)
    df.to_sql(db_table, schema=schema, con=engine, index=False)


@task
def get_df():
    return pd.DataFrame()


with Flow("ex") as flow:
    db_conn = PrefectSecret("DB_CONNECTION_STRING")
    df = get_df()
    load(df, db_conn)