Passing Pandas DF to task if i have `task_1` retu...
# prefect-community
y
Passing Pandas DF to task if i have
task_1
return a df and I pass the df as an input to
task_2
, does the df physically go to the Prefect server ? the issue I have is that it looks like it is getting stuck when trying to return the df and pass it to the next task. is there a better way to pass df from one task to the next ?
Copy code
@task()
def task_1():
    df = ....
    return df

@task()
def task_2(df):
    df2 = df ....
    return df2

def main():
    with Flow("voc_sentiment") as flow:
        df = task_1()
        df2 = task_2(df)
I think it is not the df, but something in my flow structure
it looks like when I have
@task(timeout=600)
the task get stuck, even though the task is complete. Any thought why would it be, and there there is alternative to this way of creating
timeout
?
If I create the df using
Copy code
df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=['a', 'b', 'c'])
there is not problem but when I collect the data using
Copy code
df = get_df_with_sql(sql_com=sql)
even thought I get the df, I can log the df info in the task after the df is created, I can put a break point in the debugger on the
return df
and stop at this point, but the task get stuck, does not go to the next step. any thoughts?
as if it somehow thinks that there is some process still running
This function reads data from Hadoop, but at the end of the function it closes the connection and the cursor
Copy code
curr.close()
    conn.close()
k
Looking now
This looks fine to me if you explicitly close the connection inside the task. Could you show a snippet with the connection?
y
the connection is in a different python package tool
Copy code
from rc_hadool_sql import get_df_with_sql
k
Are you on LocalDaskExecutor or LocalExecutor?
y
Copy code
executor=LocalDaskExecutor()
k
I think you might have a problem with the connection not being thread safe? would be better to create it and close it inside the task if you can
y
using
LocalExecutor()
does not have this issue
the function
get_df_with_sql
does not return the connection, just a data-frame
k
Could you show me the full code?
y
looking to see if there are ways to close the connection after
df = pd.read_sql(pandas_sql, conn)
besides
Copy code
curr.close()
conn.close()
k
I think that’s the only way
y
the test code:
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor, LocalExecutor
import prefect
import pandas as pd
from rc_hadoop_sql import get_df_with_sql

sql = '''
select distinct
    id,
    regexp_replace(lower(text),'\\t|\\n|\\r|\\001|\\\\|,|[ ]+',' ') text,
    case
        when lower(rca_1_level_1) like '%+' then 'positive'
    else 'negative'
    end as sentiment
from wootric.enriched_tagged
where
    text is not null and
    lower(rca_1_level_1) not rlike lower('no comment|no related comment')
    and rca_1_level_1 is not null
'''


@task(log_stdout=True, timeout=600)
# @task(log_stdout=True)
def create_df():
    logger = prefect.context.get("logger")
    df = get_df_with_sql(sql_com=sql)
    <http://logger.info|logger.info>('Created df')
    return df


@task(log_stdout=True)
def use_df(df):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('Got df')
    print(df.head())
    return df


def main():
    # with Flow("test_passing_df", executor=LocalDaskExecutor()) as flow:
    with Flow("test_passing_df", executor=LocalExecutor()) as flow:
        df = create_df()
        df = use_df(df)

    flow.run()


if __name__ == "__main__":
    main()
The code collecting the data
Copy code
def get_df_with_sql(sql_com, clean_sql_cmd=True, split_cmds=True, use_impala=True, verbose=False):
    """
    Returns a Pandas data-frame with the results of the last query in sql_com

    Args:
        sql_com (str): string containing SQL commands
        clean_sql_cmd (bool): When True, clean sql_com
        split_cmds (bool): when True, if sql_com contains several SQL commands, run each one separately
        use_impala (bool): True: use Impala, False: Use Hive
        verbose (boo): more printout during run

    Returns:
        df (Pandas DF): data collected by the last sql
    """
    if use_impala:
        conn = connect(host=IMPALA, auth_mechanism='GSSAPI')
    else:
        conn = connect(host=HIVE, auth_mechanism='GSSAPI', port=10000, kerberos_service_name='hive')
    curr = conn.cursor()

    if clean_sql_cmd:
        sql_com = clean_sql(sql_com)
    if split_cmds:
        sql_cmds = sql_com.split(';')
        sql_cmds = [x for x in sql_cmds if x]
    else:
        sql_cmds = [sql_com]
    pandas_sql = sql_cmds.pop(-1)

    for sql_com in sql_cmds:
        if bool(sql_com):
            try:
                # re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE)
                if not use_impala and re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE):
                    continue
                curr.execute(sql_com)
            except Exception as e:
                if verbose:
                    if sys.version_info.major > 2:
                        print(e)
                    else:
                        print(e.message)
    try:
        df = pd.read_sql(pandas_sql, conn)
    except Exception as e:
        print(e)
        print('Panda collection SQL: {}'.format(pandas_sql))
        df = pd.DataFrame()

    curr.close()
    conn.close()
    return df
the first part of the function removes the part of the code that are not the data collection parts
k
This looks fine to me. Am surprised. What happens if you use processes instead of threads for the LocalDaskExecutor
y
can you direct me to docs regarding “if you use processes instead of threads for the LocalDaskExecutor” ?
k
Copy code
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
y
The real flow, not the one in the test I shared, looks like:
Copy code
def main():
    start_date = datetime.now(timezone('US/Pacific'))
    schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
    with Flow("voc_sentiment", schedule=schedule, executor=LocalDaskExecutor()) as flow:
         # Code ...
where should I add your line of code?
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
like this?
Copy code
def main():
    start_date = datetime.now(timezone('US/Pacific'))
    schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
    with Flow("voc_sentiment", schedule=schedule) as flow:
        flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
        # code ....
k
after the flow statement.
Copy code
with Flow() as flow:
    ...
flow.executor = LocalDaskExecutor(..)
which is also equivalent to passing it in the Flow
Copy code
Flow(executor=LocalDaskExecutor())
y
pass the test.. thanks