hi folks, does anybody know how to pass prefect se...
# prefect-ui
l
hi folks, does anybody know how to pass prefect secrets from the ui to a prefect worker in gcp kubernetes?
t
HI Laura, are you using the Prefect Cloud UI to set the secret? You should be able to use a Prefect Secret task within your flow to retrieve that secret and pass it to another task
Copy code
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret

@task
def my_task(credentials):
    """A task that requires credentials to access something. Passing the
    credentials in as an argument allows you to change how/where the
    credentials are loaded (though we recommend using `PrefectSecret` tasks to
    load them."""
    pass

with Flow("example") as flow:
    my_secret = PrefectSecret("MYSECRET")
    res = my_task(credentials=my_secret)
does this help?
๐Ÿ‘ 1
(regardless of whether you use Graphql, the UI, or the client, as shown here, the secrets will be stored the same way and reachable via the Cloud API)
๐Ÿ‘ 1
l
hii @Tyler Wanner thanks for your reply. so this is my code....but that doesn't work, can u see any errors here?
i run it this way:
Copy code
flow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
t
I think I see the problemโ€ฆ
can you move the PrefectSecret tasks to the Flow context instead of within the oauth task and then pass the results into the oauth task?
that would require moving the secrets import to the top outside of the task, then moving those 4 lines to the line before oauth()
l
Copy code
@task
def oauth(client_id, username, client_secret, password):
    from prefect.tasks.secrets import PrefectSecret
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass

with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
like that?
thats giving this
Copy code
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session



@task
def oauth(client_id, username, client_secret, password):

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass

with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
also tried this, but giving me @Tyler Wanner
t
hmm have u installed that library? i assume this is from the local flow.run()
l
yes
what you mean with local flow run?
it does work when i just execute the python code with the secrets stored in variables
but not with the flow runner
n
Hi @Laura Vaida could you share the full code you're using? (Perhaps a pared-down reproduction since it looks like your flow is pretty complex!)
l
Copy code
import requests
import fsspec
import sys
import json
import csv
import os.path
import snowflake.connector
import sysconfig
import configparser
import pandas
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
from pandas import json_normalize
from datetime import datetime
import openpyxl
import numpy as np
import csv
from prefect.executors import DaskExecutor
from prefect.engine.results import GCSResult
from prefect.storage import GCS
from google.cloud import storage
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
import docker
import google.cloud
from prefect import task, Flow
import datetime
import random
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session

#define executor as dask executor with tcp from load balancer
executor = DaskExecutor(address="35.198.104.230:8786") # tcp:port address of dask load balancer


"""""
Local stored access secrets
@task
def get_config():
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session
    config = configparser.ConfigParser()
    config.read('C:/Users/laura.vaida.000/billwerk_api.ini')


# Example of retrieving config data
# config.get("billwerk_api", 'client_id')

######################################################
# Getting access token with client_id and client_secret
######################################################
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=config.get("billwerk_api", 'client_id')))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                          username=config.get("billwerk_api", 'username'),
                          password=config.get("billwerk_api", 'password'),
                          client_id=config.get("billwerk_api", 'client_id'),
                          client_secret=config.get("billwerk_api", 'client_secret'))

"""

@task
def oauth(client_id, username, client_secret, password):
    from oauthlib.oauth2 import LegacyApplicationClient
    from requests_oauthlib import OAuth2Session

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
                              username=username,
                              password=password,
                              client_id=client_id,
                              client_secret=client_secret)
    pass


@task
def creating_dataframe():
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    bw_contracts_start = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

    # Getting last contract id for while loop
    last_entry = bw_contracts_start[199]['Id']

    contracts_pandaframe = json_normalize(bw_contracts_start, sep='_')
    #Converting in string for merge function
    contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

    #separately reading phases from dataframe
    phases_pandaframe = json_normalize(bw_contracts_start, record_path=['Phases'], record_prefix='Phases_', sep='_')
    #Converting in string for merge function
    phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

    #contracts_pandaframe.dtypes
    #phases_pandaframe.dtypes

    #Merging phases to initial dataframe
    contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on ='Id', how ='left')
    column_names = contracts_pandaframe.columns
    length_contracts = len(contracts_pandaframe['Id'])

    ###########################################
    # Optional for retrieving column information
    # print(contracts_pandaframe.columns)
    # print(<http://contracts_pandaframe.info|contracts_pandaframe.info>)
    ###########################################
    bw_contracts_summary = pandas.DataFrame(data=[], columns=[column_names])

##################
# While-Loop
##################

@task
def while_loop():
    customer_ids = pandas.Series(data='', dtype=str)
    type(last_entry)
    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = contracts_pandaframe
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        # Converting in string for merge function
        contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

        # separately reading phases from dataframe
        phases_pandaframe = json_normalize(bw_contracts, record_path=['Phases'], record_prefix='Phases_', sep='_')[1:length]
        # Converting in string for merge function
        phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)

        # Merging phases to initial dataframe
        contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on='Id', how='left')

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])

    print('Already included')

# bw_contracts_summary.columns

# Deleting unnecessary columns from all contracts
@task
def modifying_columns():
    bw_contracts_summary = bw_contracts_summary.drop(
        columns=['PaymentBearer_EmailAddress', 'PaymentBearer_Holder', 'PaymentBearer_Last4', 'PaymentBearer_LastName',
                 'PaymentBearer_FirstName', 'PaymentBearer_MaskedCardPan', 'ThresholdBillingDisabled', 'CustomerName', 'Phases'])

    # Adding current timestamp
    current_date = datetime.today().strftime('%Y-%m-%d')
    bw_contracts_summary['Timestamp_Loading'] = current_date
    fieldnames = bw_contracts_summary.columns

""""
##Saving locally
path = r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts.csv'
contracts_pandaframe.to_csv(r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts_'+ current_date+'.csv', index = False, header=True)


##store result in gc bucket
gcs_result = GCSResult(bucket='billwerk', location='billwerk_contracts.csv')
with Flow("store to gcp bucket", result = 'billwerk') as store_in_gcp_bucket:

##Transforming to csv file
csv_file =bw_contracts_summary.to_csv(index= False)
store_in_gcp_bucket.run()
state=store_in_gcp_bucket.run()
state.result[csv_file]._result.location
""""


with Flow("billwerk-contracts") as flow:
    BW_Username = PrefectSecret("BW_Username")
    BW_Password = PrefectSecret("BW_Password")
    BW_Client_Id = PrefectSecret("BW_Client_Id")
    BW_Client_Secret = PrefectSecret("BW_Client_Secret")
    oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)


flow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
message has been deleted
this would be complete code
n
Thanks @Laura Vaida - let me take a look
Could you try fetching the secret in the task again like this?
Copy code
from prefect import task, Flow
from prefect.client import Secret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session

@task
def oauth():

    username = Secret("BW_Username").get()
    password = Secret("BW_Password").get()
    client_id = Secret("BW_Client_Id").get()
    client_secret = Secret("BW_Client_Secret").get()

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=username,
        password=password,
        client_id=client_id,
        client_secret=client_secret,
    )
    pass


with Flow("billwerk-contracts") as flow:
    oauth()

flow.run()
I'm not sure this will solve your issue but it might help narrow it down
(I would say for the sake of testing it might be useful to remove everything but the above)
l
heey, first at all, thanks for helping me!
i still get that error, so maybe call it without parameters?
n
Oh sorry, I forgot to remove those args! Updated that snippet above
l
now i get that again with the reference tasks, for some reason prefects wants to look only for local secrets
but i swear, they are here ๐Ÿ˜„
n
Ah, progress!
In your config.toml (located at
~/.prefect/config.toml
is this set?
Copy code
[context.secrets]
use_local_secrets = True

BW_Username = "some_val"
# rest of your secrets
l
no i have these ๐Ÿ˜„ so recommended me to set the config for cloud like these
do i have to reset that?
n
Sorry @Laura Vaida - juggling a few things over here. Since that's a
.toml
file, the ordering/header sections is important. To set a secret in that file, it should be under the
[context.secrets]
section (it looks like you've got that under
logging
)
l
no worries, but i dont know why i have to set this here again? so this is the uotput
n
how did you export your secrets in your environment? Did you run
export PREFECT__CONTEXT__SECRETS__BW_Username = "some val"
?
l
yes but with set isntead export
n
Using
setenv
?
l
no only set
is his wrong?
i use windows
n
Ah ok sorry! Was thinking linux >.<
And when you run
echo %PREFECT__CONTEXT__SECRETS__BW_Username%
you see the value printed correctly, right?
l
nope
what does that mean?
n
Sorry this might be different if you're using powershell, i think you need to run
echo $Env:PREFECT__CONTEXT__SECRETS__BW_Username
l
mmh i use anaconda prompt actually
but cmd gices the same? ๐Ÿ˜„
n
Ah ok - well if you're not able to print/echo that value then I think Prefect is telling you the correct thing - that that value isn't set correctly for whatever reason
l
ok so it depends on my environment variables?
why can't it read that from the config file?
i just executed conda promp as admin and set the environment variable
Copy code
(prefect) C:\WINDOWS\system32>python
Python 3.8.0 (default, Nov  6 2019, 16:00:02) [MSC v.1916 64 bit (AMD64)] :: Anaconda, Inc. on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from prefect.client import Secret
Traceback (most recent call last):
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 511, in loads
    ret = decoder.load_line(line, currentlevel, multikey,
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 778, in load_line
    value, vtype = self.load_value(pair[1], strictly_valid)
  File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 816, in load_value
    raise ValueError("Only all lowercase booleans allowed")
ValueError: Only all lowercase booleans allowed
resulting in this error now
@nicholas maybe we can make a short call soon about that?