Ievgenii Martynenko
05/06/2022, 12:20 PMtoken = EnvVarSecret("TOKEN")
connections = get_connections(token=token) # this is a task that returns dict(name, connection_string)
start_task_result = start_task(connections.get('some_key')) # i'd expect to get value from dict, not attribute error
Anna Geller
05/06/2022, 12:25 PMIevgenii Martynenko
05/06/2022, 12:28 PMAnna Geller
05/06/2022, 12:32 PMIevgenii Martynenko
05/06/2022, 12:35 PMget_connections = K2Vault(
name="Get connections from vault"
)
start_task = MySQLExecute(
name='XXX',
query=sql_start,
commit=True
)
with Flow(name="XXX") as flow:
token = EnvVarSecret("TOKEN")
connections = get_connections(token=token)
start_task_result = start_task(**eval(connections.get('aws_connection')))
conn = get_connections_task(...)
run_result = run_task(**conn)
Anna Geller
05/06/2022, 12:39 PMstart_task_result = start_task(**eval(connections.get('aws_connection')))
in Prefect 1.0, you can only call tasks in your Flow. You should move this part to a separate @task
and call it in your Flow to make it workIevgenii Martynenko
05/06/2022, 12:41 PMAnna Geller
05/06/2022, 12:46 PM@flow
decorator which gives Prefect an entry point to orchestration and that's all you need to turn your code into a Prefect workflow