Marius Haberstock
03/08/2022, 3:18 PMapply_map
. Details following in the thread 🙂Max Kolasinski
03/08/2022, 3:49 PMRoger Webb
03/08/2022, 3:52 PMAdam Roderick
03/08/2022, 3:59 PMBruno Nunes
03/08/2022, 4:21 PMben
03/08/2022, 5:14 PMMartha Edwards
03/08/2022, 6:01 PMMartha Edwards
03/08/2022, 6:16 PMRoger Webb
03/08/2022, 9:42 PMMoises Vera
03/09/2022, 2:23 AMextract_data.py
flow file (config for the flow)
▪︎ here I import the tasks directory with a simple import tasks
◦ tasks
at the same level, a directory with my tasks
◦ When I register this flow it works correctly in schedule basis
• I just added a new flow calculate_something.py
• The tasks for this flow are in the tasks
directory too
• now when I want to run this new flow I get FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'tasks\'")
What I don't get is... Why is it working for the first flow and not for this new one?
Any ideas? I appreciate itSuresh R
03/09/2022, 8:00 AMToby Rahloff
03/09/2022, 8:21 AMmanual_only
trigger. Is it possible to do the same with Orion (in the future)?Bruno Nunes
03/09/2022, 8:46 AMTomer Cagan
03/09/2022, 10:16 AMVadym Dytyniak
03/09/2022, 11:28 AM@task(result=S3Result(bucket='bucket-name'))
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
return ddf
@task()
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
raise ValueError("Checkpoint testing...")
Flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs?
ThanksMartin T
03/09/2022, 12:22 PMPrefectSecret()
to load secrets from Prefect Cloud. Is it possible to load secret from cloud into prefect.context.secrets
during flow start so all tasks can access them when needed?
Most of our tasks variables are auth-related, and the code/layout is getting overly complex.Prasanth Kothuri
03/09/2022, 1:05 PMNo heartbeat detected from the remote task; marking the run as failed.
Michał
03/09/2022, 1:48 PMWesley Jin
03/09/2022, 4:31 PMproject
a create_flow_run
call creates a Flow for depending on environment variables? Example in threadChris Reuter
03/09/2022, 4:33 PMBrian Phillips
03/09/2022, 11:05 PMBrian Phillips
03/09/2022, 11:33 PMchild_ids_result = create_flow_run.map(
flow_name=unmapped(...),
project_name=unmapped(...),
parameters=parameters,
run_name=run_names,
)
wait_for_flow_run_result = wait_for_flow_run.map(
flow_run_id=child_ids_result,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
final_task(..., upstream_tasks=[wait_for_flow_run_result])
Kevin Otte
03/10/2022, 5:04 AMscrapers
> scraper1
> scraper2
prefect_flow.py
and scraper1/2 have Classes or methods that scrape specific websites. Am I able to leave those defined in those files, while referencing them in my flow.py file? It would look something like this..
import ...
@task
def scrape_site_one():
s = Scraper()
return s.scrape_site('www...')
....
with Flow("update historical pricing") as flow:
scrape_site_one()
Samuel Tober
03/10/2022, 8:19 AMTomer Cagan
03/10/2022, 8:38 AMFredrik Blomgren
03/10/2022, 9:47 AMLuuk
03/10/2022, 12:28 PMaz acr login --name acrname
logs out after a couple hours and requires me to do it again.
I also run the command within the docker image as well (for the docker agent)
It's all working, but after a while I get logged out from the ACR and my flow starts to crash.
Error message:
500 Server Error for http+docker://localhost/v1.41/images/create?tag=latest&fromImage=acrname.azurecr.io%2Fimage: Internal Server Error ("Head https://acrname.azurecr.io/v2/image/manifests/latest: unauthorized: authentication required")
Tomer Cagan
03/10/2022, 12:49 PMFuETL
03/10/2022, 12:59 PMset_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Scheduled()
)
FuETL
03/10/2022, 12:59 PMset_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Scheduled()
)
Anna Geller
03/10/2022, 1:44 PMretries
and triggers
to automatically perform some recomputation of task runs depending on what you try to accomplish?case
), retries
and triggers
to automatically perform some recomputation - check out those examples:
• https://discourse.prefect.io/t/how-can-i-trigger-downstream-tasks-based-on-upstream-task-s-state/106#prefect-10-2
• https://discourse.prefect.io/t/how-can-i-stop-the-task-run-based-on-a-custom-logic/83#prefect-10-2
• https://discourse.prefect.io/t/how-can-i-stop-a-flow-run-execution-based-on-a-condition/105#prefect-10-2
• https://discourse.prefect.io/t/how-to-build-a-conditional-flow-of-flows-i-e-trigger-a-different-child-flow-depending-on-the-upstream-flows-state/202FuETL
03/10/2022, 2:20 PMAnna Geller
03/10/2022, 2:37 PMimport random
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.tasks.notifications import SlackTask
from prefect.triggers import all_successful
from typing import cast
def post_to_slack_and_cancel_run_on_task_failure(task, old_state, new_state):
if new_state.is_failed():
flow_run_id = prefect.context.flow_run_id
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {flow_run_id} "
f"with an exception {value}. Cancelling the flow run and manually resuming later."
)
SlackTask(message=msg).run()
client = Client()
client.cancel_flow_run(flow_run_id)
return new_state
@task(state_handlers=[post_to_slack_and_cancel_run_on_task_failure])
def run_process_that_may_fail():
if random.random() > 0.5:
raise ValueError("Failing due to missing information")
@task(trigger=all_successful)
def run_if_success():
print("Success")
with Flow("state_handler_ex") as flow:
first_task = run_process_that_may_fail()
run_if_success(upstream_tasks=[first_task])
if __name__ == "__main__":
flow.run()
Note that:
• attaching all_successful
trigger is not needed explicitly since this is the default
• cancelling a flow run within this state handler is not necessary because due to downstream task being triggered only on Success, this task wouldn't be triggered if this flaky task fails. But wanted to show it in case you need something like that in your logic
I wouldn't recommend rebuilding the Restart logic since this is hard to implement and frankly a bit overkill for the problem at hand. I think you can totally solve your issue using:
• retries
• triggers
• conditional tasks
• state handlers
• and optionally subflows