Klemen Strojan
07/14/2020, 11:59 AMtask = ShellTask()
m = Secret('my_secret_on_the_cloud').get()
with Flow('my_flow') as flow:
name = task(
helper_script='cd ~'
env={'CREDENTIAL':m},
command="mysqldump -u my_user -p$CREDENTIAL my_db > my_db.sql"
)
while I should be using it like this:
task = ShellTask()
with Flow('my_flow') as flow:
m = Secret('my_secret_on_the_cloud')
name = task(
helper_script='cd ~'
env={'CREDENTIAL':m},
command="mysqldump -u my_user -p$CREDENTIAL my_db > my_db.sql"
)
The later fails with TypeError
. I have the secret on the cloud and in the local config.toml
for testing purposes. What am I doing wrong?josh
07/14/2020, 12:00 PMPrefectSecret
task 🙂 https://docs.prefect.io/api/latest/tasks/secrets.html#prefectsecretfrom prefect.tasks.secrets import PrefectSecret
task = ShellTask()
with Flow('my_flow') as flow:
m = PrefectSecret('my_secret_on_the_cloud')
name = task(
helper_script='cd ~'
env={'CREDENTIAL':m},
command="mysqldump -u my_user -p$CREDENTIAL my_db > my_db.sql"
)
Klemen Strojan
07/14/2020, 12:04 PMRobin
07/14/2020, 12:50 PMimport prefect
from <http://AIBat.io|AIBat.io> import (connect_to_snowflake, create_table,
fetch_table_from_snowflake)
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect.tasks.secrets import PrefectSecret
with Flow("dask-flow") as flow:
sf_account = "snowflake-account" # PrefectSecret("sf_account")
sf_user = "sf-user" # PrefectSecret("sf_user")
sf_password = PrefectSecret("sf_password")
connection = connect_to_snowflake(
sf_account=sf_account, sf_user=sf_user, sf_password=sf_password,
)
flow.environment = LocalEnvironment(
executor=DaskExecutor(adapt_kwargs={"minimum": 1, "maximum": 5},)
)
flow.register(project_name="eks_test_01")
flow.run()
However, I get the following error:
UserWarning: Tasks were created but not added to the flow: {<Task: sf_password>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see <https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows>.
Don’t I explicitly call the sf_password
in connect_to_snowflake
?josh
07/14/2020, 12:59 PMflow.visualize()
?Robin
07/14/2020, 1:14 PMconnect_to_snowflake
without adding the @task
decorator. Therefore, the visualization is empty.
I guess that’s because no task is in the flow?josh
07/14/2020, 1:15 PM@task
decorator or be a class that inherits from Task
(https://docs.prefect.io/core/advanced_tutorials/task-guide.html#the-basics). That’s why it’s not being added to the flow because it isn’t a task 🙂Robin
07/14/2020, 1:30 PM@task
def connect_to_snowflake_task(sf_account, sf_user, sf_password):
return connect_to_snowflake(
sf_account=sf_account, sf_user=sf_user, sf_password=sf_password
)