YD
03/03/2022, 9:53 PMtask_1
return a df and I pass the df as an input to task_2
, does the df
physically go to the Prefect server ?
the issue I have is that it looks like it is getting stuck when trying to return the df and pass it to the next task.
is there a better way to pass df from one task to the next ?
@task()
def task_1():
df = ....
return df
@task()
def task_2(df):
df2 = df ....
return df2
def main():
with Flow("voc_sentiment") as flow:
df = task_1()
df2 = task_2(df)
YD
03/04/2022, 12:26 AMYD
03/04/2022, 1:02 AM@task(timeout=600)
the task get stuck, even though the task is complete.
Any thought why would it be, and there there is alternative to this way of creating timeout
?YD
03/04/2022, 1:14 AMdf = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=['a', 'b', 'c'])
there is not problem
but when I collect the data using
df = get_df_with_sql(sql_com=sql)
even thought I get the df, I can log the df info in the task after the df is created, I can put a break point in the debugger on the return df
and stop at this point, but the task get stuck, does not go to the next step.
any thoughts?YD
03/04/2022, 1:15 AMYD
03/04/2022, 1:19 AMcurr.close()
conn.close()
Kevin Kho
Kevin Kho
YD
03/04/2022, 3:22 PMfrom rc_hadool_sql import get_df_with_sql
Kevin Kho
YD
03/04/2022, 4:56 PMexecutor=LocalDaskExecutor()
Kevin Kho
YD
03/04/2022, 5:10 PMLocalExecutor()
does not have this issueYD
03/04/2022, 5:11 PMget_df_with_sql
does not return the connection, just a data-frameKevin Kho
YD
03/04/2022, 5:12 PMdf = pd.read_sql(pandas_sql, conn)
besides
curr.close()
conn.close()
Kevin Kho
YD
03/04/2022, 5:14 PMfrom prefect import task, Flow
from prefect.executors import LocalDaskExecutor, LocalExecutor
import prefect
import pandas as pd
from rc_hadoop_sql import get_df_with_sql
sql = '''
select distinct
id,
regexp_replace(lower(text),'\\t|\\n|\\r|\\001|\\\\|,|[ ]+',' ') text,
case
when lower(rca_1_level_1) like '%+' then 'positive'
else 'negative'
end as sentiment
from wootric.enriched_tagged
where
text is not null and
lower(rca_1_level_1) not rlike lower('no comment|no related comment')
and rca_1_level_1 is not null
'''
@task(log_stdout=True, timeout=600)
# @task(log_stdout=True)
def create_df():
logger = prefect.context.get("logger")
df = get_df_with_sql(sql_com=sql)
<http://logger.info|logger.info>('Created df')
return df
@task(log_stdout=True)
def use_df(df):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('Got df')
print(df.head())
return df
def main():
# with Flow("test_passing_df", executor=LocalDaskExecutor()) as flow:
with Flow("test_passing_df", executor=LocalExecutor()) as flow:
df = create_df()
df = use_df(df)
flow.run()
if __name__ == "__main__":
main()
The code collecting the data
def get_df_with_sql(sql_com, clean_sql_cmd=True, split_cmds=True, use_impala=True, verbose=False):
"""
Returns a Pandas data-frame with the results of the last query in sql_com
Args:
sql_com (str): string containing SQL commands
clean_sql_cmd (bool): When True, clean sql_com
split_cmds (bool): when True, if sql_com contains several SQL commands, run each one separately
use_impala (bool): True: use Impala, False: Use Hive
verbose (boo): more printout during run
Returns:
df (Pandas DF): data collected by the last sql
"""
if use_impala:
conn = connect(host=IMPALA, auth_mechanism='GSSAPI')
else:
conn = connect(host=HIVE, auth_mechanism='GSSAPI', port=10000, kerberos_service_name='hive')
curr = conn.cursor()
if clean_sql_cmd:
sql_com = clean_sql(sql_com)
if split_cmds:
sql_cmds = sql_com.split(';')
sql_cmds = [x for x in sql_cmds if x]
else:
sql_cmds = [sql_com]
pandas_sql = sql_cmds.pop(-1)
for sql_com in sql_cmds:
if bool(sql_com):
try:
# re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE)
if not use_impala and re.match('.*invalidate.*meta.*|.*compute.*stat.*', sql_com, flags=re.IGNORECASE):
continue
curr.execute(sql_com)
except Exception as e:
if verbose:
if sys.version_info.major > 2:
print(e)
else:
print(e.message)
try:
df = pd.read_sql(pandas_sql, conn)
except Exception as e:
print(e)
print('Panda collection SQL: {}'.format(pandas_sql))
df = pd.DataFrame()
curr.close()
conn.close()
return df
YD
03/04/2022, 5:15 PMYD
03/04/2022, 5:17 PMKevin Kho
YD
03/04/2022, 5:33 PMKevin Kho
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
YD
03/04/2022, 5:56 PMdef main():
start_date = datetime.now(timezone('US/Pacific'))
schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
with Flow("voc_sentiment", schedule=schedule, executor=LocalDaskExecutor()) as flow:
# Code ...
where should I add your line of code?
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
like this?
def main():
start_date = datetime.now(timezone('US/Pacific'))
schedule = Schedule(clocks=[CronClock('30 8-12 * * *', start_date=start_date)])
with Flow("voc_sentiment", schedule=schedule) as flow:
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
# code ....
Kevin Kho
with Flow() as flow:
...
flow.executor = LocalDaskExecutor(..)
which is also equivalent to passing it in the Flow
Flow(executor=LocalDaskExecutor())
YD
03/04/2022, 6:37 PM