Hi, I'm extending MySQL Execute and Fetch tasks to...
# prefect-community
i
Hi, I'm extending MySQL Execute and Fetch tasks to override init method, while 'run' remains the same. I'm initializing tasks outsize of flow() and then just call them expecting that second task is dependent on first one. Definitely doing something wrong. get_time (line 5) is just initializing a task, not getting actual output from select query.
Copy code
fetch_task = TestMySQLFetchOneValue(connection_name='...', query="select now(6) as time_column", name="Fetch Time")
execute_task = TestMySQLExecute(connection_name='...', name="Write Time")

with Flow("Test Flow") as flow:
    get_time = fetch_task()  #fetch_task().run() works
    execute = execute_task(query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")

flow.run()
k
Can you show me the definition of
TestMySQLFetchOneValue
?
query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
this looks off btw because the string will be constructred during DAG creation
You want the string formatting to happen in a task to dfer execution
Also I assume the run would return a Python object so you won’t be able to do
get_time.get()
because it won’t be the task class after execution
i
Right, I got your point. It should be like this (let's ignore bad Python for now), I'm focusing on idea.
Copy code
fetch_task = OakMySQLFetchOneValue(connection_name='...', query="select now(6) as time_value", name="Fetch time task")
execute_task = OakMySQLExecute(connection_name='...', name="Execute query")


@task
def prepare_expression(value):
    return "update test_run set run_time = '" + str(value.get('time_value')) + "'"


with Flow("Test Flow") as flow:
    time_value = fetch_task()
    query = prepare_expression(value=time_value)
    execute = execute_task(query=query)

flow.run()
k
yeah assuming the tasks are written right