hi all, I've been in transition to prefect 2 (2.0....
# prefect-community
a
hi all, I've been in transition to prefect 2 (2.0.4 to be precise) from prefect < 2 (1,2 to be precise). Thanks to your help I sovled most of the problems, only one remains: in prefect < 2 I used parallelism via
multiprocessing.Process
as
Copy code
processes = []
    for chunk in url_chunks:
        kwargs["mp3_urls"] = chunk
        processes.append(mp.Process(foo=foo, kwargs=kwargs))
    for p in processes:
        p.start()
    for p in processes:
        p.join()
and it worked. The main reason for using
multiprocessing.Process
is to have a large model in memory shared between processes. In fact it works when I run this exact flow with prefect via a python script calling the flow as a fucntion, it works as expected. However, when run it as
prefect deployment run ...
or via a scheduled run, the processes in
p.start()
don't run. The flow result is marked as success to my surprise. Any ideas?
1
a
Interesting. Could you share a full flow example we could reproduce and troubleshoot? This should work, if it doesn't it's a great candidate for a GitHub issue.
a
here's the minimal script :
Copy code
import prefect
import multiprocessing as mp

from prefect.task_runners import SequentialTaskRunner


def create_file(fname):
    logger = prefect.get_run_logger()
    <http://logger.info|logger.info>(f"creating file {fname}")
    with open(fname, "w") as f:
        f.write("abc")


def test_mp():
    logger = prefect.get_run_logger()
    fnames = [f"/tmp/test_{j}.mptest" for j in range(5)]
    processes = []
    <http://logger.info|logger.info>(f"submitting{fnames}")
    for fname in fnames:
        kwargs = {"fname": fname}
        processes.append(mp.Process(target=create_file, kwargs=kwargs))
    for p in processes:
        p.start()
    for p in processes:
        p.join()


test_mp_task = prefect.task()(test_mp)


@prefect.flow(name="etl_test", task_runner=SequentialTaskRunner())
def etl_test():
    test_mp_task.submit()


if __name__ == "__main__":
    etl_test()
I would like to confirm that it does not function properly when scheduled (using
prefect deploment apply ...
) but unfortunately I got into the same problem (I've experienced before) again: scheduled flow does not run - the state changes from
Scheduled
to
Pending
. The flow, the agent and the queue all have the same tag. Perhaps I should just wait ?
a
did you start an agent? I'd encourage you to start a new Conda environment and follow this updated deployment documentation https://docs.prefect.io/concepts/deployments/
many things got easier, perhaps installing the latest version 2.1 will make the process easier for you
a
yes, I did start an agent. I've also observed that a
paused
queue continues to submit flows.
a
would you want to build a minimal reproducible example and submit a GitHub issue then?
a
will do, still trying to confirm after the transition to 2.1
ok, the toy example works on 2.1
🙌 1
a
thanks for the update!