Edmondo Porcu
05/07/2022, 1:51 AMwith Flow(''') as flow:
in the body of the script. However, since there are parameters that set from env variable in the main like so, that was causing an exception
if __name__ == '__main__':
flow = build_flow()
flow.executor = LocalDaskExecutor()
project_name = os.environ['PROJECT_NAME']
spark_version = os.environ['SPARK_VERSION']
github_repo = os.environ['GITHUB_REPO']
git_ref = os.environ['GIT_REF']
flow.run(
project_name=project_name,
spark_version=spark_version,
github_repo=github_repo,
git_ref=git_ref
)
and now I have wrapped my flow definition in a function. Is that a reasonable thing to do?Kevin Kho
Edmondo Porcu
05/07/2022, 2:06 AMKevin Kho
Edmondo Porcu
05/07/2022, 2:19 AMKevin Kho
testing state
Edmondo Porcu
05/07/2022, 2:23 AMKevin Kho
Edmondo Porcu
05/07/2022, 3:12 AMKevin Kho
Edmondo Porcu
05/07/2022, 3:21 AMKevin Kho
task.run()
task.run()
yet. It might be task.fn()
Edmondo Porcu
05/07/2022, 4:20 PMKevin Kho
Edmondo Porcu
05/07/2022, 4:26 PMKevin Kho
Anna Geller
Edmondo Porcu
05/07/2022, 4:36 PMdef func1()
perform complex sql query on google big table
def func2(input):
whatever
def func3(input):
whatever
def func4(input):
whatever
def flow():
res1 = func1()
res2 = [ func2(res) for res in res1]
if res2 > 10:
func3()
else:
func4()
Anna Geller
Edmondo Porcu
05/07/2022, 4:38 PMAnna Geller
Edmondo Porcu
05/07/2022, 4:39 PMAnna Geller
@pytest.mark.parametrize("branch", ["a", "b", "c"])
def test_case_execution(self, branch):
with Flow("test") as flow:
cond = identity(branch)
with case(cond, "a"):
a = identity(1)
b = inc(a)
with case(cond, "b"):
c = identity(3)
d = inc(c)
e = merge(b, d)
state = flow.run()
if branch == "a":
assert state.result[a].result == 1
assert state.result[b].result == 2
assert state.result[c].is_skipped()
assert state.result[d].is_skipped()
assert state.result[e].result == 2
elif branch == "b":
assert state.result[a].is_skipped()
assert state.result[b].is_skipped()
assert state.result[c].result == 3
assert state.result[d].result == 4
assert state.result[e].result == 4
elif branch == "c":
for t in [a, b, c, d, e]:
assert state.result[t].is_skipped()
Edmondo Porcu
05/07/2022, 5:04 PMAnna Geller
flow.visualize()
Edmondo Porcu
05/07/2022, 5:09 PMAnna Geller
Edmondo Porcu
05/07/2022, 5:18 PMAnna Geller
Zach Schumacher
05/07/2022, 7:59 PMAnna Geller
Zach Schumacher
05/08/2022, 2:16 PMflow.visualize
in the PRAnna Geller
Edmondo Porcu
05/08/2022, 8:04 PMZach Schumacher
05/08/2022, 8:14 PMEdmondo Porcu
05/08/2022, 8:14 PMAlvaro Durán Tovar
05/08/2022, 8:17 PMEdmondo Porcu
05/08/2022, 8:18 PMAlvaro Durán Tovar
05/08/2022, 8:20 PMEdmondo Porcu
05/08/2022, 8:20 PMAnna Geller
Edmondo Porcu
05/08/2022, 8:23 PMAnna Geller
Edmondo Porcu
05/08/2022, 8:24 PMdef build_flow(p1,p2,p3): Flow
def test_flow_1():
flow = build_flow(2,3,4)
assert flow.n_tasks = 72
Alvaro Durán Tovar
05/08/2022, 8:25 PMCole Murray
05/08/2022, 8:26 PMEdmondo Porcu
05/08/2022, 8:26 PMunittest.MagicMock
to replace real tasks with mocked tasks?\Alvaro Durán Tovar
05/08/2022, 8:27 PMCole Murray
05/08/2022, 8:27 PM@task
def myDataDownloadTask(data_url):
...
and passing in parameterized data url that is different when running testsAlvaro Durán Tovar
05/08/2022, 8:30 PMflow.run()
to run the whole flow
But then the intermediate code probably will get pretty complexEdmondo Porcu
05/08/2022, 8:31 PMCole Murray
05/08/2022, 8:35 PM@task
def my_missing_dependency(a):
import pandas as pd
df = pd.read_csv(a)
....
Your flow linkage may be correct, your unit tests on the tasks may be correct (depending on your test env, this may pass)
but this will still fail at runtime and take the $15k with itEdmondo Porcu
05/09/2022, 1:27 AMAlvaro Durán Tovar
05/09/2022, 7:19 AMdef flow_name(name: Text) -> Text:
return f"{name}-{get_environment()}"
with Flow(flow_name(FLOW)) as flow:
flow code...
That is flow name is changed to reflect it is configured for a specific environment (prod, qa, etc).
Then configure flow appropriately (qa DB, qa buckets, etc) having a less data to crunch when executing the flow as mentioned by @Cole Murray
Then trigger the flow and make assertions after finishes