Martin T
01/04/2022, 7:30 AMNoah Holm
01/04/2022, 9:52 AMLucas Hosoya
01/04/2022, 12:49 PMstate.Cancelling
and state.Cancelled
as True
, but none of them occurs.
It only register as Cancelling
or Cancelled
when I put the flow to the State (UI) as Cancelled.
Another weird fact is if I put the State as Cancelled, my tasks keeps running until it finishes (either in fail or succeed).
I'm currently working on State handlers, how can I be sure that I'm cancelling my flow runs and consequently my tasks for that flow run?Thomas Hoeck
01/04/2022, 1:52 PMScheduled
to Cancelled
without pressing cancel in the UI. By the looks of the following query the Flow Run was cancelled because a new version of the Flow was released (which archived the old flow). Is this expected behavior?Rob Douglas
01/04/2022, 2:00 PMPhilip MacMenamin
01/04/2022, 3:47 PMl1 = [a, b, c]
and I generate l2 = task_name.map(l1)
, can I trust that element 0 in l2
derives from element 0 from l1
?Theo Platt
01/04/2022, 4:22 PMDaniel Kornhauser
01/04/2022, 5:30 PMlnav
Mathijs Miermans
01/04/2022, 6:31 PMFrederick Thomas
01/04/2022, 7:59 PMPhilip MacMenamin
01/05/2022, 12:16 AMcontext
maybe? I saw a reply from @Kevin Kho stating:
@task
def abc():
prefect.context.my_var = here
this is still the way to do this?Florian Kühnlenz
01/05/2022, 7:17 AMMukamisha jocelyne
01/05/2022, 10:50 AMJason Noxon
01/05/2022, 1:54 PMAlvaro Durán Tovar
01/05/2022, 3:56 PME Li
01/05/2022, 4:18 PMHammad Ahmed
01/05/2022, 4:26 PMAn Hoang
01/05/2022, 7:25 PMperform_permutation
and generate_summary_df
are both upstream dependencies of process_report_df
. However, the flow fails at process_report_df
without ever running the two upstream tasks. The error is due to not having data from upstream task generate_summary_df
. The post-hoc state visualization doesn't show the two upstream tasks at all. Why does this happen? I will post the log and task results in the comments.Thomas Fredriksen
01/05/2022, 7:55 PMModuleNotFoundError: No module named 'pipelines'
The error seem quite clear, however there is no reference to any module called pipelines
, but the directory that holds my flows has this name. Overall, my project structure looks like this:
my_sdk
|- __init__.py
|- my_module.py
|- cli
|- __init__.py
|- __main__.py
pipelines
|- __init__.py
|- first_flow
|- __init__.py
|- flow.py
|- second_flow
|- __init__.py
|- flow.py
We have created a CLI-tool that handles deployment to our infrastructure, and works by dynamically importing the flow using importlib
, then setting up the Docker
storage before calling flow.register
.
# Import flow
sys.path.append(os.path.abspath(pipeline_path))
flow_module = importlib.import_module(os.path.join(pipeline_path, "flow").replace("/", "."))
flow: Flow = getattr(flow_module, "flow")
This worked really well until we started seeing the error above.
Previously, we used to have a separate deploy.py
-script for each flow that would build and register the flows. We would ocasionally see a similar error, saying that it could not find a module called flow
. Simply copying the flow.py
file to the built image by adding it to the Docker
-storage init solved this:
flow.storage = Docker(
# ....
files={
path.join(FLOW_DIR, "flow.py"): "/flow.py"
}
)
The strange part is that this error was not deterministic, and would only happen for one or two of our many flows. Since we only had to add the lines above to the deployment-file, we considered the problem as "solved for now".
While working on the CLI, we once again encountered the same problem. We eventually solved it the same way as before, but this time had to copy flow.py
to /pipelines/flow_name/flow.py
in order to make it work, but once again - only a few flows were affected, not all of them.
When debugging this issue, we managed to reduce the flow to a state where the problem simply dissapeared. We isolated a function that caused the ModuleNotFoundError
, however it was not clear what part of the function that caused the error. Finally reducing the function to a stub, the error still persisted. The function only consisted of a single line pass
, but were still causing the error. Removing the call to the offending function magically solved the problem, even though it was just a stub.
Our conclusion is that the flow.py
-file somehow is pickled when the Docker-image is built, but only in certain situations. We do not know why.
Post-ramble summary - We observed that the file containing our flow-definition would be pickled with the flow itself when building the docker-image, but only in certain situations. Does anyone know what might be causing this behavior? We are happy with having to copy flow.py
to the final docker-image, but would like to understand what is going on here.Tom Shaffner
01/05/2022, 8:26 PMLouis Eisenberg
01/05/2022, 9:42 PMelectric-albatross
) to the app_sync_flow_claim_lock.R
script. (The reason is that I want the claim operation to fail only if the lock is already held and the holder is not this run. So the claim script needs to know the name of the current run.)
with Flow("App sync") as flow:
app_claim_lock = ShellTask(
command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
task_args = dict(
name = "app sync flow claim lock",
max_retries=5,
retry_delay=datetime.timedelta(minutes=10),
)
)
Aric Huang
01/05/2022, 10:28 PMconcurrent.futures.ProcessPoolExecutor
in a Prefect task. With the following code as test.py
, if run using python3 test.py
it completes successfully, but with prefect run --path test.py
it stays stuck at 0% completion.
from prefect import task, Flow
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
def process(inp):
return inp
@task
def test():
print("start")
with ProcessPoolExecutor(
max_workers=4,
) as pool:
futures = [pool.submit(process, inp) for inp in range(1000)]
out = [f.result() for f in tqdm(as_completed(futures), total=1000)]
print('done')
with Flow("Test") as flow:
test()
if __name__ == "__main__":
test.run()
Any ideas what could be happening here?Piyush Bassi
01/06/2022, 3:24 AMThomas Hoeck
01/06/2022, 8:23 AMStartFlowRun
. The subflow runs succesfully but in the parent flow I get the following error:M. Siddiqui
01/06/2022, 11:35 AMShivam Bhatia
01/06/2022, 12:55 PMMike Lev
01/06/2022, 1:56 PMflow_a
@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
with Flow('flow_a') as flow_a:
first_task_message = first_task()
second_task_message = second_task(first_task_message)
third_task_message = third_task(second_task_message)
runtime=
KubernetesRun
DaskExecutor
S3Results
S3Storage
using prefect cloud backend
.... all flows are registered as expected including parent flowrilshok
01/06/2022, 2:41 PMAmber Papillon
01/06/2022, 2:46 PMMahesh
01/06/2022, 3:12 PMMahesh
01/06/2022, 3:12 PMfrom prefect import context
flowids=prefect.context.get("flow_run_id")
print (flowids)
test=ShellTask(
shell="bash",
return_all=True,
log_stderr=True,
log_stdout=True,
stream_output=True,
command="echo Hello {}".format(flowids)
)
[2022-01-06 15:10:29+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Dimension1'
[2022-01-06 15:10:29+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2022-01-06 15:10:29+0000] INFO - prefect.ShellTask | Hello None
[2022-01-06 15:10:29+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Success'
[2022-01-06 15:10:29+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Kevin Kho
01/06/2022, 3:19 PM