Ievgenii Martynenko
03/30/2022, 9:19 PMfetch_task = TestMySQLFetchOneValue(connection_name='...', query="select now(6) as time_column", name="Fetch Time")
execute_task = TestMySQLExecute(connection_name='...', name="Write Time")
with Flow("Test Flow") as flow:
get_time = fetch_task() #fetch_task().run() works
execute = execute_task(query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
flow.run()
Kevin Kho
TestMySQLFetchOneValue
?Kevin Kho
query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
this looks off btw because the string will be constructred during DAG creationKevin Kho
Kevin Kho
get_time.get()
because it won’t be the task class after executionIevgenii Martynenko
03/31/2022, 7:53 AMfetch_task = OakMySQLFetchOneValue(connection_name='...', query="select now(6) as time_value", name="Fetch time task")
execute_task = OakMySQLExecute(connection_name='...', name="Execute query")
@task
def prepare_expression(value):
return "update test_run set run_time = '" + str(value.get('time_value')) + "'"
with Flow("Test Flow") as flow:
time_value = fetch_task()
query = prepare_expression(value=time_value)
execute = execute_task(query=query)
flow.run()
Kevin Kho