Faheem Khan
04/26/2022, 7:48 AMRhys Mansal
04/26/2022, 9:22 AMparam_a = flow.add_task(Parameter("param_a", default=True))
with case(param_a, False):
task_a = some_task(arg)
task_b = some_task(task_a)
some_task
is being run multiple times with different arguments. When param_a
is false some of these should be skipped. If I do not use @task(skip_on_upstream_skip=False)
in the task decorator on some_task
then all downstream tasks from the first skipped task are skipped, whether they are inside a case or not. If I do, none of them are skipped regardless of what param_a
is set to.
Does anyone have any idea how to get only the tasks inside the with block to skip (and only when param_a
is true)?Florian Guily
04/26/2022, 10:20 AMxyzz
04/26/2022, 11:13 AMRegarding storage the Orion docs say "you can also configure a self-hosted key-value store for testing.".
But what kind of key-value store is compatible and how to set it up for testing purposes?
xyzz
04/26/2022, 11:17 AMxyzz
04/26/2022, 11:20 AMMalthe Karbo
04/26/2022, 12:41 PMNikhil Joseph
04/26/2022, 1:15 PMMarwan Sarieddine
04/26/2022, 1:28 PM{'_schema': 'Invalid data type: None'}
twice over the last week over our many flow runs.
It seems other folks have encountered this due to a version mismatch between the agent and their execution environment.
However that is not the case for us - additionally the same flow run will proceed to run successfully for future runs without any changes from our end.
See more details in the threadLeon Kozlowski
04/26/2022, 1:44 PMRescheduled by a Lazarus process. This is attempt 1.
What would be the best way to me to root cause this issue?Andrey Tatarinov
04/26/2022, 2:01 PMShuchita Tripathi
04/26/2022, 2:28 PMJason
04/26/2022, 2:29 PMreturn os.stat(filename).st_size
FileNotFoundError: [Errno 2] No such file or directory: 'hello-flow.py'
The S3 storage class is configured in a shared module between flows as such:
storage = S3(
bucket="EDITED-prod-platform-prefect",
key=f"{project}/flows/{flow_name}.py",
stored_as_script=True,
local_script_path=f"projects/Examples/flows/{flow_name}.py",
)
Bruno Murino
04/26/2022, 2:33 PMMilton
04/26/2022, 3:02 PMBruno Murino
04/26/2022, 4:09 PMLukáš Pravda
04/26/2022, 4:19 PM@task
def foo():
....
@task
def bar():
....
with Flow() as flow:
try:
foo()
except:
bar()
test.py
from file import flow
from unittest.mock import patch
@patch("file1.bar")
@patch("file.foo")
def test_flow(mock1, mock2):
mock.side_effect = Exception("throw an error")
flow.run()
assert mock2.assert_called_once()
but the mock is never called, have found this: <https://github.com/PrefectHQ/prefect/issues/1801>
, but could not really mount that solution to my exact problem. What am I missing? Thank youTom Manterfield
04/26/2022, 4:37 PMKubernetesFlowRunner
instance other than building the keys into the image itself. Has anyone else found a solution for this?Xavier Babu
04/26/2022, 4:44 PMKathryn Klarich
04/26/2022, 5:14 PMAttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
- it seems to be happening during this step RUN pip install pip --upgrade
- has anyone come across this before and know how to fix it? I was able to successfully register this flow a few days ago and haven’t changed much in the requirements since then.Bradley Hurley
04/26/2022, 5:21 PMJai P
04/26/2022, 5:39 PMcase
statement in prefect 2.0: is there a rough timeline for when that may be introduced? Also, are there any major differences that are planned between how they work in prefect 1.0, where i think you can only conditionally go between tasks (to, say, possibly supporting subflows)?Anna Geller
Slackbot
04/26/2022, 6:55 PMDylan
04/26/2022, 7:20 PMMichael Moscater
04/26/2022, 7:52 PMJosephine Douglas
04/26/2022, 11:33 PMFailed to load and execute Flow's environment: TypeError("got an unexpected keyword argument 'raise_final_state'")
What am I doing incorrectly here?Slackbot
04/27/2022, 1:32 AMRaviraja Ganta
04/27/2022, 6:28 AMFlorian Guily
04/27/2022, 10:08 AMWaiting for next available Task run at 2022-04-27T10:00:35.571259+00:00
). After a few seconds, a new log message states: Beginning Flow run for 'flow_name'
and i have all of the previous log message of the previous tasks, as if the flow was re-executed. Is it normal ? Is the flow really reexecuted ? When executing this flow from prefect cloud, i don't see those logs so i'm a bit confused.