Aqib Fayyaz
11/18/2021, 7:13 PMVipul
11/18/2021, 7:15 PMVipul
11/18/2021, 8:22 PMkiran
11/18/2021, 8:39 PMnohup
instead of Supervisor? I’m thinking something simple like this nohup prefect agent local start 2>&1 > /tmp/prefect_local_agent.log &
John T
11/18/2021, 9:00 PMPrefectResult
? I’m currently encountering this error:
TypeError: PrefectResult only supports JSONSerializer or DateTimeSerializer
Kevin Kho
John Muehlhausen
11/18/2021, 11:08 PMHugo Shi
11/18/2021, 11:43 PMDominic Pham
11/19/2021, 12:15 AMJeremiah Lethoba
11/19/2021, 8:03 AMAJ
11/19/2021, 8:50 AMAdam Everington
11/19/2021, 10:19 AMflow.register(project_name='my-project', indempotency_key=flow.serialized_hash())
on prefect server will the version number still get bumped each time?Florian Kühnlenz
11/19/2021, 11:42 AMAqib Fayyaz
11/19/2021, 12:18 PMprefect agent kubernetes install -k API_KEY | kubectl apply --namespace=my-namespace -f -
are they both the same, like i have one deployed with prefect server on gke but i need to use prefect cloud so is one agent enough for both server and cloud or i have to deploy seperate one for cloud?ek
11/19/2021, 3:26 PMLeon Kozlowski
11/19/2021, 6:03 PMMax Kolasinski
11/19/2021, 9:31 PMall_failed
or any_failed
- if the ETL Task fails, it shouldn’t run. What I believe I would need is something like an on_x_task_failed
option- it seems like the available options are way too broad to be useful.
• I then looked into some of the ideas on the Conditional Logic page, but this seems clumsy for a few reasons. I need a Task specifically to check the State of the Validation Task, and then on our Schematic View we have additional Tasks showing up for each Case Task as well as the Merge Task. All combined, it makes our Schematic look like the image below which seems crazy for what is effectively if x do y
.
I feel like I have to be approaching this in completely the wrong way- if anyone had any ideas or suggestions I would be extremely grateful.Tom Shaffner
11/19/2021, 11:16 PMAndré Bonatto
11/20/2021, 12:50 PMfrom prefect import Flow, Parameter, task
from etl import prepare, load
from crawlers import crawler1, crawler2
from typing import Dict, Callable, String
prepare = task(prepare)
load = task(load)
def make_standard_pipeline(flow_name: String, func :Callable, func_params:Dict):
with Flow(flow_name) as flow:
params = {k: Parameter(k, default = v) for k,v in func_params.items()}
df = func(**params)
df = prepare(df)
df = load(df)
return flow
pipe1 = make_standard_pipeline('flow1', task(crawler1), {})
pipe2 = make_standard_pipeline('flow2', task(crawler2), {'type' : 'xxx'})
Locally this code runs fine and I can also register these on the prefect server. However, when I try to run the flows, only the first flow defined in the file runs successfully (I tested reordering the flows). For the other flows, I get Key Error saying it couldn't found task slug crawler2. Does anyone has hints on what could be causing this problem?
Thank you.Chen Di
11/20/2021, 3:30 PMManuel Gomes
11/21/2021, 5:40 PMValueError: Filename must be a string
So there is clearly some sort of.. unwrapping/unpacking that I'm missing?
my flow is likewise in the thread, as is the invocation.
So... would someone please tell me in which exact angle I should smack my forehead and the correct octave of the "duh!"... plus maybe what I should be doing instead?Wilhelm Su
11/21/2021, 8:54 PMAnh Nguyen
11/22/2021, 10:21 AMJean-Baptiste Six
11/22/2021, 2:05 PMslack_notifier
, without success.
• Then I tried with SlackTask
, this is my piece of code (didn't work) :
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = str(new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(
message=msg,
webhook_secret="<https://hooks.slack.com/services/*******/*******/*******>"
).run()
return new_state
• Finnally, I tried with a custom solution (didn't work, again) :
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_finished():
msg = "Task {0} finished in state {1}".format(task, new_state)
# replace with your Slack webhook URL secret name
secret_slack = cast(str, Secret("<https://hooks.slack.com/services/*******/*******/*******>").get())
<http://requests.post|requests.post>(secret_slack, json={"text": msg})
return new_state
This is my test flow :
@task
def task_error():
raise Exception("Test")
with Flow("Test Slack", state_handlers=[post_to_slack_on_failure]) as flow:
task_error()
flow.run()
(I also tried EmailTask with smtp_type="INSECURE", and once again it didn't work)
Need some help please 🙏Margaret Walter
11/22/2021, 4:18 PMTom Shaffner
11/22/2021, 4:23 PMwith Flow(flow_name) as flow:
<http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
df = set_data_types(df)
upload_to_table(df, destination_table = data_destination_table_name)
if summary_view_name is not None and history_table_name is not None:
<http://logger.info|logger.info>("Initiating history upload process.")
summary_df,summary_data_empty = pull_summary_data_via(sql=f"SELECT * FROM {summary_view_name}")
if summary_data_empty:
delete_today_from_history_if_exists(df=df,history_table=history_table_name)
upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
else:
<http://logger.info|logger.info>("Skipping summary view run: summary view name and/or history table name missing.")
To address this I tried to make the dependencies explicit, by adding "upstream_task" flags to two of the above lines as so:
summary_df,summary_data_empty = pull_summary_data_via(_upstream_tasks_=[upload_to_table],_sql_=_f_"SELECT * FROM {summary_view_name}")
delete_today_from_history_if_exists(_upstream_tasks_=[pull_summary_data_via],_df_=df,_history_table_=history_table_name)
This doesn't seem to fix the issue though; when I run the flow, later tasks still seem to initiate before the Oracle pull, which should occur before everything. Anyone see what I'm doing wrong? The documentation would seem to indicate that feeding result data from one task to another would make dependencies work correctly, but that doesn't seem to be happening here.Marko Herkaliuk
11/22/2021, 5:36 PMDavid Yang
11/22/2021, 8:18 PMJason Motley
11/22/2021, 8:19 PMJason Motley
11/22/2021, 8:41 PMextract => transform => delete 14 days in target => append last 22 days w/ no dupes
. Right now I"m doing 2 parallel ETLs, one which ends in replacing the existing data and the other which appends. This seems very slow since I have to perform 2 full extracts.