Daniel Kornhauser
01/04/2022, 5:30 PMlnav
Mathijs Miermans
01/04/2022, 6:31 PMFrederick Thomas
01/04/2022, 7:59 PMPhilip MacMenamin
01/05/2022, 12:16 AMcontext
maybe? I saw a reply from @Kevin Kho stating:
@task
def abc():
prefect.context.my_var = here
this is still the way to do this?Florian Kühnlenz
01/05/2022, 7:17 AMMukamisha jocelyne
01/05/2022, 10:50 AMJason Noxon
01/05/2022, 1:54 PMAlvaro Durán Tovar
01/05/2022, 3:56 PME Li
01/05/2022, 4:18 PMHammad Ahmed
01/05/2022, 4:26 PMAn Hoang
01/05/2022, 7:25 PMperform_permutation
and generate_summary_df
are both upstream dependencies of process_report_df
. However, the flow fails at process_report_df
without ever running the two upstream tasks. The error is due to not having data from upstream task generate_summary_df
. The post-hoc state visualization doesn't show the two upstream tasks at all. Why does this happen? I will post the log and task results in the comments.Thomas Fredriksen
01/05/2022, 7:55 PMModuleNotFoundError: No module named 'pipelines'
The error seem quite clear, however there is no reference to any module called pipelines
, but the directory that holds my flows has this name. Overall, my project structure looks like this:
my_sdk
|- __init__.py
|- my_module.py
|- cli
|- __init__.py
|- __main__.py
pipelines
|- __init__.py
|- first_flow
|- __init__.py
|- flow.py
|- second_flow
|- __init__.py
|- flow.py
We have created a CLI-tool that handles deployment to our infrastructure, and works by dynamically importing the flow using importlib
, then setting up the Docker
storage before calling flow.register
.
# Import flow
sys.path.append(os.path.abspath(pipeline_path))
flow_module = importlib.import_module(os.path.join(pipeline_path, "flow").replace("/", "."))
flow: Flow = getattr(flow_module, "flow")
This worked really well until we started seeing the error above.
Previously, we used to have a separate deploy.py
-script for each flow that would build and register the flows. We would ocasionally see a similar error, saying that it could not find a module called flow
. Simply copying the flow.py
file to the built image by adding it to the Docker
-storage init solved this:
flow.storage = Docker(
# ....
files={
path.join(FLOW_DIR, "flow.py"): "/flow.py"
}
)
The strange part is that this error was not deterministic, and would only happen for one or two of our many flows. Since we only had to add the lines above to the deployment-file, we considered the problem as "solved for now".
While working on the CLI, we once again encountered the same problem. We eventually solved it the same way as before, but this time had to copy flow.py
to /pipelines/flow_name/flow.py
in order to make it work, but once again - only a few flows were affected, not all of them.
When debugging this issue, we managed to reduce the flow to a state where the problem simply dissapeared. We isolated a function that caused the ModuleNotFoundError
, however it was not clear what part of the function that caused the error. Finally reducing the function to a stub, the error still persisted. The function only consisted of a single line pass
, but were still causing the error. Removing the call to the offending function magically solved the problem, even though it was just a stub.
Our conclusion is that the flow.py
-file somehow is pickled when the Docker-image is built, but only in certain situations. We do not know why.
Post-ramble summary - We observed that the file containing our flow-definition would be pickled with the flow itself when building the docker-image, but only in certain situations. Does anyone know what might be causing this behavior? We are happy with having to copy flow.py
to the final docker-image, but would like to understand what is going on here.Tom Shaffner
01/05/2022, 8:26 PMLouis Eisenberg
01/05/2022, 9:42 PMelectric-albatross
) to the app_sync_flow_claim_lock.R
script. (The reason is that I want the claim operation to fail only if the lock is already held and the holder is not this run. So the claim script needs to know the name of the current run.)
with Flow("App sync") as flow:
app_claim_lock = ShellTask(
command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
task_args = dict(
name = "app sync flow claim lock",
max_retries=5,
retry_delay=datetime.timedelta(minutes=10),
)
)
Aric Huang
01/05/2022, 10:28 PMconcurrent.futures.ProcessPoolExecutor
in a Prefect task. With the following code as test.py
, if run using python3 test.py
it completes successfully, but with prefect run --path test.py
it stays stuck at 0% completion.
from prefect import task, Flow
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
def process(inp):
return inp
@task
def test():
print("start")
with ProcessPoolExecutor(
max_workers=4,
) as pool:
futures = [pool.submit(process, inp) for inp in range(1000)]
out = [f.result() for f in tqdm(as_completed(futures), total=1000)]
print('done')
with Flow("Test") as flow:
test()
if __name__ == "__main__":
test.run()
Any ideas what could be happening here?Piyush Bassi
01/06/2022, 3:24 AMThomas Hoeck
01/06/2022, 8:23 AMStartFlowRun
. The subflow runs succesfully but in the parent flow I get the following error:M. Siddiqui
01/06/2022, 11:35 AMShivam Bhatia
01/06/2022, 12:55 PMMike Lev
01/06/2022, 1:56 PMflow_a
@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
with Flow('flow_a') as flow_a:
first_task_message = first_task()
second_task_message = second_task(first_task_message)
third_task_message = third_task(second_task_message)
runtime=
KubernetesRun
DaskExecutor
S3Results
S3Storage
using prefect cloud backend
.... all flows are registered as expected including parent flowrilshok
01/06/2022, 2:41 PMAmber Papillon
01/06/2022, 2:46 PMMahesh
01/06/2022, 3:12 PMIan Singer
01/06/2022, 3:39 PMEric Richard
01/06/2022, 3:45 PMCarrie Bohinc
01/06/2022, 4:33 PMVaibhav Shetye
01/06/2022, 5:55 PMJason Noxon
01/06/2022, 6:57 PMGui Pires
01/06/2022, 7:24 PMIsaac Brodsky
01/06/2022, 7:36 PMRuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in with worker_client()