Lukas
04/29/2020, 4:41 PMpd.DataFrame
objects. In the end I have one task "gathering" those dataFrame results to write them all into a Postgres DB (reason to have it all in one task is that I want either all results to be uploaded or none of them in case something fails). The function that is supposed to write these results into DB takes a dict
as argument with the key tablename
and the value dataFrame
. So in the function I loop over the dict and write the data frame into the corresponding table. Here is where something goes wrong: my keys and values get mixed up and the function ends up trying to write the dataframes into the wrong table which obviously causes errors.
My flow looks somewhat like this:
task_result_1 = task1()
task_result_2 = task2()
task_result_3 = task3()
upload_dfs(
{
"table1": task_result_1,
"table2": task_result_2,
"table3": task_result_3,
},
)
In the flow prefect automatically creates a List
and a Dict
Task. Is the confusion in my keys / values somehow related to the sorting of items in the List
class? https://github.com/PrefectHQ/prefect/blob/eb59918d98b15ba6e14c0a406cee885c0e44ea8b/src/prefect/tasks/core/collections.py#L73
Thanks a lot! 🙂Jeremiah
from prefect import task, Flow
@task
def test(x):
return x
with Flow("test") as flow:
x = test(1)
y = test(2)
z = test(3)
a = test(dict(x=x, y=y, z=z))
state = flow.run()
assert state.result[a].result == dict(x=1, y=2, z=3)
Lukas
04/30/2020, 7:41 AMLukas
04/30/2020, 10:12 AMfrom prefect import Flow, task
import pandas as pd
@task
def test(x):
return x
@task
def return_df(no_of_columns):
df = pd.DataFrame(columns=range(no_of_columns))
return df
with Flow("Flow") as flow:
col_1 = return_df(1)
col_2 = return_df(2)
col_3 = return_df(3)
col_7 = return_df(7)
col_6 = return_df(6)
col_9 = return_df(9)
col_10 = return_df(10)
col_4 = return_df(4)
col_12 = return_df(12)
col_13 = return_df(13)
col_11 = return_df(11)
col_5 = return_df(5)
col_8 = return_df(8)
a = test(
{
"col_1": col_1,
"col_2": col_2,
"col_3": col_3,
"col_7": col_7,
"col_6": col_6,
"col_9": col_9,
"col_10": col_10,
"col_4": col_4,
"col_13": col_13,
"col_8": col_8,
}
)
state = flow.run()
So if you run print(state.result[a]._result)
afterwards, you'll see that the result is
<Result: {'col_1': Empty DataFrame
Columns: [0]
Index: [], 'col_2': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5, 6, 7]
Index: [], 'col_3': Empty DataFrame
Columns: [0, 1]
Index: [], 'col_7': Empty DataFrame
Columns: [0, 1, 2]
Index: [], 'col_6': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5, 6]
Index: [], 'col_9': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5]
Index: [], 'col_10': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5, 6, 7, 8]
Index: [], 'col_4': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Index: [], 'col_13': Empty DataFrame
Columns: [0, 1, 2, 3]
Index: [], 'col_8': Empty DataFrame
Columns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Index: []}>
So col_2
for example has 8 columns in the output instead of 2.
But if you run print(state.result[col_2]._result)
it actually returns the desired
<Result: Empty DataFrame
Columns: [0, 1]
Index: []>
Lukas
04/30/2020, 10:24 AMfrom prefect import task, Flow
@task
def test(x):
return x
with Flow("test") as flow:
a = test(1)
b = test(2)
c = test(3)
d = test(4)
e = test(5)
f = test(6)
g = test(7)
h = test(8)
i = test(9)
j = test(10)
a = test(dict(a=a, b=b, c=c, d=d, e=e, f=f, g=g, h=h, i=i, j=j))
state = flow.run()
assert state.result[a].result == dict(a=1, b=2, c=3, d=4, e=5, f=6, g=7, h=8, i=9, j=10)
I get an assertion error.Jeremiah
Lukas
04/30/2020, 12:20 PMJeremiah
Marvin
04/30/2020, 12:21 PMJeremiah
Jeremiah
Jeremiah
In [7]: state.result[a].result
Out[7]:
{'a': 1,
'b': 10,
'c': 2,
'd': 3,
'e': 4,
'f': 5,
'g': 6,
'h': 7,
'i': 8,
'j': 9}
Lukas
04/30/2020, 12:25 PMList
task that somehow calls the sorting here: https://github.com/PrefectHQ/prefect/blob/eb59918d98b15ba6e14c0a406cee885c0e44ea8b/src/prefect/tasks/core/collections.py#L73Jeremiah
arg_1, arg_2, etc.
in order to preserve the list order — but plainly it needs another lookJeremiah
Jeremiah
Lukas
04/30/2020, 12:26 PMJeremiah
arg_10
sorts right after arg_1
Major facepalm moment.Jeremiah
Lukas
04/30/2020, 1:04 PM