Maria
06/15/2022, 4:09 PMJacob Bedard
06/15/2022, 5:07 PMOmar Sultan
06/15/2022, 5:57 PMError during execution of task: ConnectTimeout(MaxRetryError("HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f32b0d48910>, 'Connection to prefect-apollo.prefect timed out. (connect timeout=60)'))"))
This seems to affect all running tasks at the time, so for example if I have 2 or 3 tasks running, they all fail at the exact same time and give this errorKevin Kho
06/15/2022, 6:51 PMR
06/15/2022, 7:00 PMJoshua Allen
06/15/2022, 7:18 PMJoshua Allen
06/15/2022, 7:20 PMAlex de Geofroy
06/15/2022, 7:21 PMPedro Machado
06/15/2022, 7:57 PMenv
argument of a class derived from ShellTask
.
bash = LoggedShellTask(stream_output=True)
# more stuff here ...
with Flow(
FLOW_NAME, storage=docker_storage, run_config=run_config, result=PrefectResult()
) as flow:
run_results = bash(
helper_script=get_helper_script(repo_path),
command=get_command(ml_script_path, ml_script_arguments, conda_env=conda_env),
env=dict(
SNOWFLAKE_USER=snowflake_user,
SNOWFLAKE_PASS=snowflake_pass,
SNOWFLAKE_DATABASE=snowflake_database,
SNOWFLAKE_OUTPUT_SCHEMA=snowflake_output_schema,
),
log_file_path=get_log_file_path(output_dir),
)
The issue is Prefect automatically creates a List
and Dict
task that have PrefectResults
by default and this exposes the secrets in the UI.
A couple of ideas that come to mind:
1. Assign a specific results objects to each task (not at the flow level)
2. Create a wrapper task that receives all the secrets and returns a dict that is passed to the ShellTask
3. Create a ShellTask
that accepts each secret as a parameter
Any suggestions? One feature I'd like to preserve is the ability to restart the flow and I'd rather not persist the secrets anywhere.
Thanks!ibrahem
06/15/2022, 8:42 PMJason
06/15/2022, 9:03 PMRobert Esteves
06/15/2022, 9:19 PMVictoria Alvarez
06/15/2022, 10:10 PMShaoyi Zhang
06/15/2022, 10:22 PMCA Lee
06/15/2022, 11:48 PMFaheem Khan
06/16/2022, 2:44 AMFaheem Khan
06/16/2022, 5:56 AMAmro
06/16/2022, 6:27 AMGuangSheng Liu
06/16/2022, 7:16 AMexport PREFECT__SERVICES__SERVICES__TOWEL__MAX__SCHEDULED__RUNS__PER__FLOW=1
or
export PREFECT__MAX__SCHEDULED__RUNS__PER__FLOW=1
Surya
06/16/2022, 10:27 AMRoger Webb
06/16/2022, 1:30 PMEd Burroughes
06/16/2022, 2:36 PMdef build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
with Flow(name, **flow_kwargs) as flow:
repeat_task_output = repeat_task()
return flow, repeat_task_output
@contextmanager
def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
try:
yield flow
finally:
print("do something")
if __name__ == "__main__":
@task(log_stdout=True)
def some_task(repeat_task_output):
print(repeat_task_output)
with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
some_task(repeat_task_output)
flow.run()
Joshua Allen
06/16/2022, 2:36 PMdocker
module for Python instead? Or is there a better way?Matthew Millendorf
06/16/2022, 3:51 PMDavid Yak
06/16/2022, 4:16 PMSlackbot
06/16/2022, 4:39 PMJosh
06/16/2022, 5:44 PMDavid Yak
06/16/2022, 5:52 PMFina Silva-Santisteban
06/16/2022, 5:59 PMXavier Babu
06/16/2022, 7:55 PMXavier Babu
06/16/2022, 7:55 PMKevin Kho
06/16/2022, 7:57 PMXavier Babu
06/16/2022, 7:59 PMKevin Kho
06/16/2022, 8:01 PMALTER USER my_user WITH DEFAULT_SCHEMA = OtherSchema;
Xavier Babu
06/17/2022, 2:10 PMKevin Kho
06/17/2022, 2:44 PM