https://prefect.io logo
Title
l

Laura Vaida

03/08/2021, 2:46 PM
hi, if i want to run more than 10 tasks in a flow, do I have to upgrade from the free plan? Or just when I want to run 10 tasks at the same time?
n

nicholas

03/08/2021, 2:47 PM
Hi @Laura Vaida - none of the Prefect Cloud tiers have constraints on the number of tasks in a flow; are you running into a specific issue?
l

Laura Vaida

03/08/2021, 2:48 PM
hi, yes it seems like i can only run 10 taks in a flow
does that have smth to do with task concurrency?
n

nicholas

03/08/2021, 2:52 PM
Hm, can you show me what you're seeing? Task concurrency limiting is something that needs to be explicitly configured
l

Laura Vaida

03/08/2021, 2:53 PM
n

nicholas

03/08/2021, 2:55 PM
Gotcha, limiting task concurrency is only available on upgraded tiers of Cloud - can you share what you mean when you say you can only run 10 tasks in a flow?
l

Laura Vaida

03/08/2021, 2:58 PM
when i register the flow and have more than 10 tasks/statements it will only take the first 10 ones
n

nicholas

03/08/2021, 3:01 PM
Can you share your flow code and the schematic for your flow?
l

Laura Vaida

03/08/2021, 3:06 PM
well, im trying to work around that with cached results
gcs_result = GCSResult(bucket='billwerk')

@task
def oauth():
    bw_username=Secret("BW_Username").get()
    bw_client_secret=Secret("BW_Client_Secret").get()
    bw_client_id=Secret("BW_Client_Id").get()
    bw_password=Secret("BW_Password").get()

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=Secret("BW_Client_Id").get()))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=bw_username,
        password=bw_password,
        client_id=bw_client_id,
        client_secret=bw_client_secret
    )
    return token


@task(log_stdout=True)
def creating_json(token):
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

@task(cache_for=datetime.timedelta(hours=1), result=gcs_result, log_stdout=True)
def input_for_loop(json):
    # Getting last contract id for while loop
    last_entry = json[199]['Id']
    contracts_pandaframe = json_normalize(json, sep='_')
    column_names = contracts_pandaframe.columns
    length_contracts = len(contracts_pandaframe['Id'])
    return last_entry, contracts_pandaframe, column_names, length_contracts
##################
# While-Loop
##################

@task(log_stdout=True)
def while_loop(last_entry,length_contracts,bw_contracts_summary,customer_ids):

    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [df, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = df
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])
        return bw_contracts_summary
    print('Already included')

# bw_contracts_summary.columns
# Deleting unnecessary columns from all contracts

@task
def modifying_columns():
    bw_contracts_summary = bw_contracts_summary.drop(
        columns=['PaymentBearer_EmailAddress', 'PaymentBearer_Holder', 'PaymentBearer_Last4', 'PaymentBearer_LastName',
                 'PaymentBearer_FirstName', 'PaymentBearer_MaskedCardPan', 'ThresholdBillingDisabled', 'CustomerName'])

    # Adding current timestamp
    current_date = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
    bw_contracts_summary['Timestamp_Loading'] = current_date
    fieldnames = bw_contracts_summary.columns

#####################################
#Writing data to snowflake
#####################################
#For writing data directly into snowflake table/stage without gcp
###Deleting old data

@task
def deleting_old_table():
    from snowflake.connector import connect
    ctx = connect(**config['snowflake'], use_openssl_only=True)
    cur = ctx.cursor().execute('delete from "KELLERSPORTS_STAGE"."STAGE_BILLWERK_CONTRACTS"."CONTRACTS_PYTHON_SCRIPT"')


@task
# Create a write function with the input data and table_name
def write_to_snowflake(data, table_name):
        from snowflake.connector.pandas_tools import write_pandas
        from snowflake.connector import connect
        ctx = connect(**config['snowflake'], use_openssl_only=True)
        cur = ctx.cursor()
        cur.execute('USE SCHEMA "STAGE_BILLWERK_CONTRACTS"')
        write_pandas(ctx, data, table_name=table_name, quote_identifiers=False)
with Flow("billwerk-contracts") as flow:
    token = oauth()
    json = creating_json(token=token)
    input_for_loop(json)
    bw_contracts_summary = pandas.DataFrame(data=[], columns=[column_names])
    customer_ids = pandas.Series(data='', dtype=str)
    customer_ids = pandas.Series(data='', dtype=str)
    while_loop = (last_entry, length_contracts, bw_contracts_summary, customer_ids)
    #modifying_columns()
    #deleting_old_table()
    #write_to_snowflake(bw_contracts_summary, "CONTRACTS_PYTHON_SCRIPT")
flow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",
                      python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake"], secrets=["GCP_CREDENTIALS"])
flow.run_config = KubernetesRun()
flow.register('Billwerk')
i would want to make 4 tasks out of input_for_loop
and to just return 1 value at one time
n

nicholas

03/08/2021, 3:11 PM
Can you share the schematic for your flow as well? You can find it under the "Schematic" tab on the UI flow page
As an alternative, you can also share the PDF output by calling
flow.visualize()
, assuming you have
graphviz
installed
l

Laura Vaida

03/08/2021, 3:12 PM
im experiencing problems with the schematic atm to be honest, i dont know if its me, but the last times it would only show the tasks until input_for_loop
that would be a schema of an old flow run
to give you an idea
n

nicholas

03/08/2021, 3:17 PM
Right, I believe this is a problem with your flow code, and not with Prefect. For example, you're overwriting your
while_loop
task definition with this line:
while_loop = (last_entry, length_contracts, bw_contracts_summary, customer_ids)
If you want to explicitly create 4 tasks from
input_for_loop
, you'll need to call that task 4 times
I'm also not sure where these objects are defined:
(last_entry, length_contracts)
And since these lines:
customer_ids = pandas.Series(data='', dtype=str)
customer_ids = pandas.Series(data='', dtype=str)
aren't Prefect tasks, they'll be evaluated as constants at the runtime of your script (when you register it) and not at flow run time as discreet tasks
l

Laura Vaida

03/08/2021, 3:19 PM
ahh okey good to know, yes you're right about the while loop there's a logic error
n

nicholas

03/08/2021, 3:27 PM
If you want to return the result of a pandas dataframe, you can wrap that in a task, something like this:
@task
def get_customer_ids():
  return pandas.Series(data='', dtype=str)
and call that task in your flow context
l

Laura Vaida

03/08/2021, 3:29 PM
yes, that also was the original id but then I had the impression that i can only do 10 tasks in a flow, which turned out to be wrong now. I also think that's a lot easier than caching
👍 1
what is task concurrency then actually referring to? running tasks at the same time?
n

nicholas

03/08/2021, 3:38 PM
Exactly - users that have strict requirements for tasks that do things like access databases might need to limit the number of tasks that run at the same time to get around connection limits
👍 1
l

Laura Vaida

03/08/2021, 4:39 PM
gcs_result = GCSResult(bucket='billwerk')

@task
def oauth():
    bw_username = Secret("BW_Username").get()
    bw_client_secret = Secret("BW_Client_Secret").get()
    bw_client_id = Secret("BW_Client_Id").get()
    bw_password = Secret("BW_Password").get()

    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=Secret("BW_Client_Id").get()))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=bw_username,
        password=bw_password,
        client_id=bw_client_id,
        client_secret=bw_client_secret
    )
    return token


@task(log_stdout=True)
def creating_json(token):
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

@task(log_stdout=True)
def get_last_entry(json):
    # Getting last contract id for while loop
    last_entry = json[199]['Id']
    print(last_entry)
    return last_entry

@task()
def creating_dataframe(json):
    return json_normalize(json, sep='_')

@task()
def get_column_names(df):
    return df.columns

@task()
def get_length_contracts(df):
    return len(df['Id'])

@task()
def get_bw_contracts_summary():
    return pandas.DataFrame(data=[], columns=[column_names])

@task()
def get_customer_ids():
    return pandas.Series(data='', dtype=str)

    ###########################################
    # Optional for retrieving column information
    # print(contracts_pandaframe.columns)
    # print(<http://contracts_pandaframe.info|contracts_pandaframe.info>)
    ###########################################


##################
# While-Loop
##################

@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):

    print(min(customer_ids))
    print(last_entry)
    print(length_contracts)
    print (min(customer_ids.str.contains(last_entry)))

    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = contracts_pandaframe
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])
        return bw_contracts_summary
    print('Already included')
with Flow("billwerk-contracts") as flow:
    token = oauth()
    json = creating_json(token=token)
    last_entry = get_last_entry(json=json)
    contracts_pandaframe = creating_dataframe(json)
    column_names = get_column_names(df=contracts_pandaframe)
    length_contracts = get_length_contracts(df=contracts_pandaframe)
    bw_contracts_summary = get_bw_contracts_summary()
    customer_ids = get_customer_ids()
    all_contracts = while_loop(token, contracts_pandaframe, last_entry, length_contracts, bw_contracts_summary, customer_ids)
now my flow looks like this
and its working, except the fact that the loop seems to be not working
n

nicholas

03/08/2021, 4:50 PM
Which loop?
l

Laura Vaida

03/08/2021, 4:50 PM
the while loop
n

nicholas

03/08/2021, 4:51 PM
Can you provide some more information about how it's not working?
l

Laura Vaida

03/08/2021, 4:51 PM
i want to overwrite the variables length_conctracts, last_entry and customer_ids in this task
##################
# While-Loop
##################

@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):

    print(min(customer_ids))
    print(last_entry)
    print(length_contracts)
    print (min(customer_ids.str.contains(last_entry)))

    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = contracts_pandaframe
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])
        return bw_contracts_summary
    print('Already included')
its working, but the loop itseld just doesnt iterate
n

nicholas

03/08/2021, 4:55 PM
Got it - that's most likely a logic issue somewhere in your task; I'd suggest breaking down your task into some smaller parts and testing locally to figure out where the bug might be.
l

Laura Vaida

03/08/2021, 4:56 PM
mmh i already ran it several times locally and worked 🙂 without a flow context
but will explicitly print the condition for while loop
@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):

    print(min(customer_ids))
    print(last_entry)
    print(length_contracts)
    print (min(customer_ids.str.contains(last_entry)))

    #while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
    while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:

        #combining all entries written in summary and the new entries from last call
        contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
        if bw_contracts_summary.empty == False:
            contracts_concat = pandas.concat(contracts_pandaframe_combined)
            bw_contracts_summary = contracts_concat
        else:
            bw_contracts_summary = contracts_pandaframe
        customer_ids = bw_contracts_summary['Id']
        row_count = (bw_contracts_summary.shape[0])
        print(row_count)

        # Get next entries before first entry
        bw_contracts = requests.get(
            '<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
                'access_token']).json()
        length = len(json_normalize(bw_contracts, sep='_')['Id'])
        contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]

        if contracts_pandaframe.empty == False:
            last_entry = contracts_pandaframe["Id"].iloc[-1]
        else:
            last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
        length_contracts = len(contracts_pandaframe['Id'])

        print(length_contracts)
        print(last_entry)
        print(customer_ids)
        print(min(customer_ids.str.contains(last_entry)))

        return bw_contracts_summary
    
    print('Already included')
I printed the conditions with print(min(customer_ids.str.contains(last_entry))) at the end and print(length_contracts) The first gives false and the latter 199, so all condition for the loop are true cause it says:
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
also it should
print('Already included')
in the end when the loop is done which doesn't happen 🤔
any ideas @nicholas?
n

nicholas

03/08/2021, 8:21 PM
@Laura Vaida - I'm not sure what's going on there,
min(customer_ids.str.contains(last_entry)) == False
doesn't make a lot of sense to me since
min
returns the number for the lowest character alphabetically in a string and
.contains
isn't a normal python method afaik (there's the string class method
__contains__
but that doesn't seem to be what you're using); maybe this is a pandas class method?
min(bool)
should throw an exception, since the
min
method takes an iterable.
It sounds like you're trying to do:
while last_entry in customer_ids.str and length_contracts > 1:
but I'm not totally sure.
l

Laura Vaida

03/08/2021, 8:53 PM
mmh yes it is a pandas method, so customer ids is of length 200 and if last entry is in these 200 it would be true, but you're right i would have to take max then
👍 1