Alexander Belikov
08/16/2022, 6:19 PMmultiprocessing.Process
as
processes = []
for chunk in url_chunks:
kwargs["mp3_urls"] = chunk
processes.append(mp.Process(foo=foo, kwargs=kwargs))
for p in processes:
p.start()
for p in processes:
p.join()
and it worked. The main reason for using multiprocessing.Process
is to have a large model in memory shared between processes.
In fact it works when I run this exact flow with prefect via a python script calling the flow as a fucntion, it works as expected. However, when run it as prefect deployment run ...
or via a scheduled run, the processes in p.start()
don't run. The flow result is marked as success to my surprise.
Any ideas?Anna Geller
08/16/2022, 7:16 PMAlexander Belikov
08/17/2022, 2:46 PMimport prefect
import multiprocessing as mp
from prefect.task_runners import SequentialTaskRunner
def create_file(fname):
logger = prefect.get_run_logger()
<http://logger.info|logger.info>(f"creating file {fname}")
with open(fname, "w") as f:
f.write("abc")
def test_mp():
logger = prefect.get_run_logger()
fnames = [f"/tmp/test_{j}.mptest" for j in range(5)]
processes = []
<http://logger.info|logger.info>(f"submitting{fnames}")
for fname in fnames:
kwargs = {"fname": fname}
processes.append(mp.Process(target=create_file, kwargs=kwargs))
for p in processes:
p.start()
for p in processes:
p.join()
test_mp_task = prefect.task()(test_mp)
@prefect.flow(name="etl_test", task_runner=SequentialTaskRunner())
def etl_test():
test_mp_task.submit()
if __name__ == "__main__":
etl_test()
I would like to confirm that it does not function properly when scheduled (using prefect deploment apply ...
) but unfortunately I got into the same problem (I've experienced before) again: scheduled flow does not run - the state changes from Scheduled
to Pending
. The flow, the agent and the queue all have the same tag. Perhaps I should just wait ?Anna Geller
08/18/2022, 12:18 AMAlexander Belikov
08/18/2022, 9:26 AMpaused
queue continues to submit flows.Anna Geller
08/18/2022, 10:28 AMAlexander Belikov
08/19/2022, 8:29 AMAnna Geller
08/21/2022, 7:21 PM