https://prefect.io logo
Title
l

Laura Vaida

02/12/2021, 11:57 AM
hi folks, does anybody know how to pass prefect secrets from the ui to a prefect worker in gcp kubernetes?
t

Tyler Wanner

02/12/2021, 3:17 PM
HI Laura, are you using the Prefect Cloud UI to set the secret? You should be able to use a Prefect Secret task within your flow to retrieve that secret and pass it to another task
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret

@task
def my_task(credentials):
    """A task that requires credentials to access something. Passing the
    credentials in as an argument allows you to change how/where the
    credentials are loaded (though we recommend using `PrefectSecret` tasks to
    load them."""
    pass

with Flow("example") as flow:
    my_secret = PrefectSecret("MYSECRET")
    res = my_task(credentials=my_secret)
does this help?
👍 1
(regardless of whether you use Graphql, the UI, or the client, as shown here, the secrets will be stored the same way and reachable via the Cloud API)
👍 1
l

Laura Vaida

02/12/2021, 3:34 PM
hii @Tyler Wanner thanks for your reply. so this is my code....but that doesn't work, can u see any errors here?
i run it this way:
flow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
t

Tyler Wanner

02/12/2021, 3:35 PM
I think I see the problem…
can you move the PrefectSecret tasks to the Flow context instead of within the oauth task and then pass the results into the oauth task?
that would require moving the secrets import to the top outside of the task, then moving those 4 lines to the line before oauth()
l

Laura Vaida

02/12/2021, 3:42 PM
@task
def oauth(client_id, username, client_secret, password):
    from prefect.tasks.secrets import PrefectSecret
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass

with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
like that?
thats giving this
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session



@task
def oauth(client_id, username, client_secret, password):

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass

with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
also tried this, but giving me @Tyler Wanner
t

Tyler Wanner

02/12/2021, 4:03 PM
hmm have u installed that library? i assume this is from the local flow.run()
l

Laura Vaida

02/12/2021, 4:26 PM
yes
what you mean with local flow run?
it does work when i just execute the python code with the secrets stored in variables
but not with the flow runner
n

nicholas

02/12/2021, 4:38 PM
Hi @Laura Vaida could you share the full code you're using? (Perhaps a pared-down reproduction since it looks like your flow is pretty complex!)
l

Laura Vaida

02/12/2021, 4:41 PM
import requests
import fsspec
import sys
import json
import csv
import os.path
import snowflake.connector
import sysconfig
import configparser
import pandas
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
from pandas import json_normalize
from datetime import datetime
import openpyxl
import numpy as np
import csv
from prefect.executors import DaskExecutor
from prefect.engine.results import GCSResult
from prefect.storage import GCS
from google.cloud import storage
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
import docker
import google.cloud
from prefect import task, Flow
import datetime
import random
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session

#define executor as dask executor with tcp from load balancer
executor = DaskExecutor(address="35.198.104.230:8786") # tcp:port address of dask load balancer


"""""
Local stored access secrets
@task
def get_config():
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session
    config = configparser.ConfigParser()
    config.read('C:/Users/laura.vaida.000/billwerk_api.ini')


# Example of retrieving config data
# config.get("billwerk_api", 'client_id')

######################################################
# Getting access token with client_id and client_secret
######################################################
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=config.get("billwerk_api", 'client_id')))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                          username=config.get("billwerk_api", 'username'),
                          password=config.get("billwerk_api", 'password'),
                          client_id=config.get("billwerk_api", 'client_id'),
                          client_secret=config.get("billwerk_api", 'client_secret'))

"""

@task
def oauth(client_id, username, client_secret, password):
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass


@task
def creating_dataframe():
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    bw_contracts_start = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

    # Getting last contract id for while loop
    last_entry = bw_contracts_start[199]['Id']

    contracts_pandaframe = json_normalize(bw_contracts_start, sep='_')
    #Converting in string for merge function
    contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

    #separately reading phases from dataframe
    phases_pandaframe = json_normalize(bw_contracts_start, record_path=['Phases'], record_prefix='Phases_', sep='_')
    #Converting in string for merge function
    phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

    #contracts_pandaframe.dtypes
    #phases_pandaframe.dtypes

    #Merging phases to initial dataframe
    contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on ='Id', how ='left')
    column_names = contracts_pandaframe.columns
    length_contracts = len(contracts_pandaframe['Id'])

    ###########################################
    # Optional for retrieving column information
    # print(contracts_pandaframe.columns)
    # print(<http://contracts_pandaframe.info|contracts_pandaframe.info>)
    ###########################################
    bw_contracts_summary = pandas.DataFrame(data=[], columns=[column_names])

##################
# While-Loop
##################

@task
def while_loop():
    customer_ids = pandas.Series(data='', dtype=str)
    type(last_entry)
    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = contracts_pandaframe
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        # Converting in string for merge function
        contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

        # separately reading phases from dataframe
        phases_pandaframe = json_normalize(bw_contracts, record_path=['Phases'], record_prefix='Phases_', sep='_')[1:length]
        # Converting in string for merge function
        phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

        # Merging phases to initial dataframe
        contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on='Id', how='left')

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])

    print('Already included')

# bw_contracts_summary.columns

# Deleting unnecessary columns from all contracts
@task
def modifying_columns():
    bw_contracts_summary = bw_contracts_summary.drop(
        columns=['PaymentBearer_EmailAddress', 'PaymentBearer_Holder', 'PaymentBearer_Last4', 'PaymentBearer_LastName',
                 'PaymentBearer_FirstName', 'PaymentBearer_MaskedCardPan', 'ThresholdBillingDisabled', 'CustomerName', 'Phases'])

    # Adding current timestamp
    current_date = datetime.today().strftime('%Y-%m-%d')
    bw_contracts_summary['Timestamp_Loading'] = current_date
    fieldnames = bw_contracts_summary.columns

""""
##Saving locally
path = r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts.csv'
contracts_pandaframe.to_csv(r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts_'+ current_date+'.csv', index = False, header=True)


##store result in gc bucket
gcs_result = GCSResult(bucket='billwerk', location='billwerk_contracts.csv')
with Flow("store to gcp bucket", result = 'billwerk') as store_in_gcp_bucket:

##Transforming to csv file
csv_file =bw_contracts_summary.to_csv(index= False)
store_in_gcp_bucket.run()
state=store_in_gcp_bucket.run()
state.result[csv_file]._result.location
""""


with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)


flow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
message has been deleted
this would be complete code
n

nicholas

02/12/2021, 4:49 PM
Thanks @Laura Vaida - let me take a look
Could you try fetching the secret in the task again like this?
from prefect import task, Flow
from prefect.client import Secret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session

@task
def oauth():

    username = Secret("BW_Username").get()
    password = Secret("BW_Password").get()
    client_id = Secret("BW_Client_Id").get()
    client_secret = Secret("BW_Client_Secret").get()

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=username,
        password=password,
        client_id=client_id,
        client_secret=client_secret,
    )
    pass


with Flow("billwerk-contracts") as flow:
    oauth()

flow.run()
I'm not sure this will solve your issue but it might help narrow it down
(I would say for the sake of testing it might be useful to remove everything but the above)
l

Laura Vaida

02/12/2021, 5:09 PM
heey, first at all, thanks for helping me!
i still get that error, so maybe call it without parameters?
n

nicholas

02/12/2021, 5:10 PM
Oh sorry, I forgot to remove those args! Updated that snippet above
l

Laura Vaida

02/12/2021, 5:11 PM
now i get that again with the reference tasks, for some reason prefects wants to look only for local secrets
but i swear, they are here 😄
n

nicholas

02/12/2021, 5:19 PM
Ah, progress!
In your config.toml (located at
~/.prefect/config.toml
is this set?
[context.secrets]
use_local_secrets = True

BW_Username = "some_val"
# rest of your secrets
l

Laura Vaida

02/12/2021, 5:22 PM
no i have these 😄 so recommended me to set the config for cloud like these
do i have to reset that?
n

nicholas

02/12/2021, 5:39 PM
Sorry @Laura Vaida - juggling a few things over here. Since that's a
.toml
file, the ordering/header sections is important. To set a secret in that file, it should be under the
[context.secrets]
section (it looks like you've got that under
logging
)
l

Laura Vaida

02/12/2021, 5:41 PM
no worries, but i dont know why i have to set this here again? so this is the uotput
n

nicholas

02/12/2021, 5:44 PM
how did you export your secrets in your environment? Did you run
export PREFECT__CONTEXT__SECRETS__BW_Username = "some val"
?
l

Laura Vaida

02/12/2021, 5:45 PM
yes but with set isntead export
n

nicholas

02/12/2021, 5:47 PM
Using
setenv
?
l

Laura Vaida

02/12/2021, 5:47 PM
no only set
is his wrong?
i use windows
n

nicholas

02/12/2021, 5:48 PM
Ah ok sorry! Was thinking linux >.<
And when you run
echo %PREFECT__CONTEXT__SECRETS__BW_Username%
you see the value printed correctly, right?
l

Laura Vaida

02/12/2021, 5:54 PM
nope
what does that mean?
n

nicholas

02/12/2021, 5:59 PM
Sorry this might be different if you're using powershell, i think you need to run
echo $Env:PREFECT__CONTEXT__SECRETS__BW_Username
l

Laura Vaida

02/12/2021, 6:02 PM
mmh i use anaconda prompt actually
but cmd gices the same? 😄
n

nicholas

02/12/2021, 6:04 PM
Ah ok - well if you're not able to print/echo that value then I think Prefect is telling you the correct thing - that that value isn't set correctly for whatever reason
l

Laura Vaida

02/12/2021, 6:10 PM
ok so it depends on my environment variables?
why can't it read that from the config file?
i just executed conda promp as admin and set the environment variable
(prefect) C:\WINDOWS\system32>python
Python 3.8.0 (default, Nov  6 2019, 16:00:02) [MSC v.1916 64 bit (AMD64)] :: Anaconda, Inc. on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from prefect.client import Secret
Traceback (most recent call last):
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 511, in loads
    ret = decoder.load_line(line, currentlevel, multikey,
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 778, in load_line
    value, vtype = self.load_value(pair[1], strictly_valid)
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 816, in load_value
    raise ValueError("Only all lowercase booleans allowed")
ValueError: Only all lowercase booleans allowed
resulting in this error now
@nicholas maybe we can make a short call soon about that?