https://prefect.io logo
Title
a

Alexander Belikov

08/16/2022, 6:19 PM
hi all, I've been in transition to prefect 2 (2.0.4 to be precise) from prefect < 2 (1,2 to be precise). Thanks to your help I sovled most of the problems, only one remains: in prefect < 2 I used parallelism via
multiprocessing.Process
as
processes = []
    for chunk in url_chunks:
        kwargs["mp3_urls"] = chunk
        processes.append(mp.Process(foo=foo, kwargs=kwargs))
    for p in processes:
        p.start()
    for p in processes:
        p.join()
and it worked. The main reason for using
multiprocessing.Process
is to have a large model in memory shared between processes. In fact it works when I run this exact flow with prefect via a python script calling the flow as a fucntion, it works as expected. However, when run it as
prefect deployment run ...
or via a scheduled run, the processes in
p.start()
don't run. The flow result is marked as success to my surprise. Any ideas?
1
a

Anna Geller

08/16/2022, 7:16 PM
Interesting. Could you share a full flow example we could reproduce and troubleshoot? This should work, if it doesn't it's a great candidate for a GitHub issue.
a

Alexander Belikov

08/17/2022, 2:46 PM
here's the minimal script :
import prefect
import multiprocessing as mp

from prefect.task_runners import SequentialTaskRunner


def create_file(fname):
    logger = prefect.get_run_logger()
    <http://logger.info|logger.info>(f"creating file {fname}")
    with open(fname, "w") as f:
        f.write("abc")


def test_mp():
    logger = prefect.get_run_logger()
    fnames = [f"/tmp/test_{j}.mptest" for j in range(5)]
    processes = []
    <http://logger.info|logger.info>(f"submitting{fnames}")
    for fname in fnames:
        kwargs = {"fname": fname}
        processes.append(mp.Process(target=create_file, kwargs=kwargs))
    for p in processes:
        p.start()
    for p in processes:
        p.join()


test_mp_task = prefect.task()(test_mp)


@prefect.flow(name="etl_test", task_runner=SequentialTaskRunner())
def etl_test():
    test_mp_task.submit()


if __name__ == "__main__":
    etl_test()
I would like to confirm that it does not function properly when scheduled (using
prefect deploment apply ...
) but unfortunately I got into the same problem (I've experienced before) again: scheduled flow does not run - the state changes from
Scheduled
to
Pending
. The flow, the agent and the queue all have the same tag. Perhaps I should just wait ?
a

Anna Geller

08/18/2022, 12:18 AM
did you start an agent? I'd encourage you to start a new Conda environment and follow this updated deployment documentation https://docs.prefect.io/concepts/deployments/
many things got easier, perhaps installing the latest version 2.1 will make the process easier for you
a

Alexander Belikov

08/18/2022, 9:26 AM
yes, I did start an agent. I've also observed that a
paused
queue continues to submit flows.
a

Anna Geller

08/18/2022, 10:28 AM
would you want to build a minimal reproducible example and submit a GitHub issue then?
a

Alexander Belikov

08/19/2022, 8:29 AM
will do, still trying to confirm after the transition to 2.1
ok, the toy example works on 2.1
🙌 1
a

Anna Geller

08/21/2022, 7:21 PM
thanks for the update!