YD
03/03/2022, 9:53 PMtask_1
return a df and I pass the df as an input to task_2
, does the df
physically go to the Prefect server ?
the issue I have is that it looks like it is getting stuck when trying to return the df and pass it to the next task.
is there a better way to pass df from one task to the next ?
@task()
def task_1():
df = ....
return df
@task()
def task_2(df):
df2 = df ....
return df2
def main():
with Flow("voc_sentiment") as flow:
df = task_1()
df2 = task_2(df)
@task(timeout=600)
the task get stuck, even though the task is complete.
Any thought why would it be, and there there is alternative to this way of creating timeout
?df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=['a', 'b', 'c'])
there is not problem
but when I collect the data using
df = get_df_with_sql(sql_com=sql)
even thought I get the df, I can log the df info in the task after the df is created, I can put a break point in the debugger on the return df
and stop at this point, but the task get stuck, does not go to the next step.
any thoughts?curr.close()
conn.close()
Kevin Kho
03/04/2022, 4:17 AMYD
03/04/2022, 3:22 PMfrom rc_hadool_sql import get_df_with_sql
Kevin Kho
03/04/2022, 3:32 PMYD
03/04/2022, 4:56 PMexecutor=LocalDaskExecutor()
Kevin Kho
03/04/2022, 4:59 PMYD
03/04/2022, 5:10 PMLocalExecutor()
does not have this issueget_df_with_sql
does not return the connection, just a data-frameKevin Kho
03/04/2022, 5:12 PMYD
03/04/2022, 5:12 PMdf = pd.read_sql(pandas_sql, conn)
besides
curr.close()
conn.close()
Kevin Kho
03/04/2022, 5:13 PMYD
03/04/2022, 5:14 PMfrom prefect import task, Flow
from prefect.executors import LocalDaskExecutor, LocalExecutor
import prefect
import pandas as pd
from rc_hadoop_sql import get_df_with_sql
sql = '''
select distinct
id,
regexp_replace(lower(text),'\\t|\\n|\\r|\\001|\\\\|,|[ ]+',' ') text,
case
when lower(rca_1_level_1) like '%+' then 'positive'
else 'negative'
end as sentiment
from wootric.enriched_tagged
where
text is not null and
lower(rca_1_level_1) not rlike lower('no comment|no related comment')
and rca_1_level_1 is not null
'''
@task(log_stdout=True, timeout=600)
# @task(log_stdout=True)
def create_df():
logger = prefect.context.get("logger")
df = get_df_with_sql(sql_com=sql)
<http://logger.info|logger.info>('Created df')
return df
@task(log_stdout=True)
def use_df(df):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('Got df')
print(df.head())
return df
def main():
# with Flow("test_passing_df", executor=LocalDaskExecutor()) as flow:
with Flow("test_passing_df", executor=LocalExecutor()) as flow:
df = create_df()
df = use_df(df)
flow.run()
if __name__ == "__main__":
main()
The code collecting the data
def get_df_with_sql(sql_com, clean_sql_cmd=True, split_cmds=True, use_impala=True, verbose=False):
"""
Returns a Pandas data-frame with the results of the last query in sql_com
Args:
sql_com (str): string containing SQL commands
clean_sql_cmd (bool): When True, clean sql_com
split_cmds (bool): when True, if sql_com contains several SQL commands, run each one separately
use_impala (bool): True: use Impala, False: Use Hive
verbose (boo): more printout during run
Returns:
df (Pandas DF): data collected by the last sql
"""
if use_impala:
conn = connect(host=IMPALA, auth_mechanism='GSSAPI')
else:
conn = connect(host=HIVE, auth_mechanism='GSSAPI', port=10000, kerberos_service_name='hive')
curr = conn.cursor()
if clean_sql_cmd:
sql_com = clean_sql(sql_com)
if split_cmds:
sql_cmds = sql_com.split(';')
sql_cmds = [x for x in sql_cmds if x]
else:
sql_cmds = [sql_com]
pandas_sql = sql_cmds.pop(-1)
for sql_com in sql_cmds:
if bool(sql_com):
try:
# re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE)
if not use_impala and re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE):
continue
curr.execute(sql_com)
except Exception as e:
if verbose:
if sys.version_info.major > 2:
print(e)
else:
print(e.message)
try:
df = pd.read_sql(pandas_sql, conn)
except Exception as e:
print(e)
print('Panda collection SQL: {}'.format(pandas_sql))
df = pd.DataFrame()
curr.close()
conn.close()
return df
Kevin Kho
03/04/2022, 5:19 PMYD
03/04/2022, 5:33 PMKevin Kho
03/04/2022, 5:48 PMflow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
YD
03/04/2022, 5:56 PMdef main():
start_date = datetime.now(timezone('US/Pacific'))
schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
with Flow("voc_sentiment", schedule=schedule, executor=LocalDaskExecutor()) as flow:
# Code ...
where should I add your line of code?
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
like this?
def main():
start_date = datetime.now(timezone('US/Pacific'))
schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
with Flow("voc_sentiment", schedule=schedule) as flow:
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
# code ....
Kevin Kho
03/04/2022, 6:06 PMwith Flow() as flow:
...
flow.executor = LocalDaskExecutor(..)
which is also equivalent to passing it in the Flow
Flow(executor=LocalDaskExecutor())
YD
03/04/2022, 6:37 PM