Apoorva Desai
03/01/2022, 9:09 AMprefect run --watch --id flowId --param strparam="paramValue"
. Then in the flow.py file, I am initializing outside the flow strparam = Parameter("strparam", default "hello")
but I'm unable to access "paramValue"
. Prefect only sees this <Parameter: strparam>
Sen
03/03/2022, 5:06 AM[2022-03-03 05:10:35,310] INFO - TranslationEvaluator Agent | Registering agent...
[2022-03-03 05:10:36,108] INFO - TranslationEvaluator Agent | Registration successful!
[2022-03-03 05:10:36,498] INFO - TranslationEvaluator Agent | Starting DockerAgent with labels ['On_Prem_Agent', 'IP:<>']
[2022-03-03 05:10:36,498] INFO - TranslationEvaluator Agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2022-03-03 05:10:36,498] INFO - TranslationEvaluator Agent | Waiting for flow runs...
I then went and created a Flow and registered the flow as a docker. But when I try to run the flow from the prefectUI, the flow starts running and I see the below lines in the log of the DockerAgent:
[2022-03-03 05:20:32,889] INFO - TranslationEvaluator Agent | Deploying flow run 9cf17c39-877c-49e7-aae4-0451e320bae4 to execution environment...
/usr/local/lib/python3.7/dist-packages/prefect/agent/docker/agent.py:151: UserWarning: DockerAgent `docker_interface` argument is deprecated and will be removed from Prefect. Setting it has no effect.
UserWarning,
[2022-03-03 05:20:33,579] INFO - TranslationEvaluator Agent | Completed deployment of flow run 9cf17c39-877c-49e7-aae4-0451e320bae4
[2022-03-03 05:20:34+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'On_Prem_Agent'
But it fails, with some libraries are missing to do the logic in one of the tasks in the flow.
I am under the impression that the agent I created needs to have all the libraries installed and the flow just run on the agent using them.
What is the right way to do this, so that I can get this working.
Thanks,
SenAntti Tupamäki
03/09/2022, 12:52 PMKevin Kho
03/16/2022, 2:04 PMAndrea Nerla
03/17/2022, 11:24 AMMike Grabbe
03/17/2022, 3:29 PMRamzi A
03/24/2022, 3:02 PMRoyzac
03/24/2022, 6:17 PMAbhishek
03/25/2022, 10:30 AMKevin Kho
03/28/2022, 2:28 PMMartin T
03/28/2022, 3:27 PMAndrew Black
03/30/2022, 3:32 PMRoyzac
03/30/2022, 7:36 PMRoyzac
04/01/2022, 1:56 PMTim-Oliver
04/04/2022, 2:39 PMPatrick Alves
04/06/2022, 1:03 PMModuleNotFoundError: No module named '/root/'
(trackback is on the thread)
If I access the agent container and run any flow manually (python workflows/ad_check/flow.py
) it works!
Can someone help me?Tim-Oliver
04/06/2022, 2:07 PMDaskTaskRunner
. Somehow it gets stuck on the last task and will not finish.
This is my minimum example:
from prefect import task, flow
from prefect.tasks import task_input_hash
import time
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner
@task
def list_files():
return ["1.txt", "2.txt"]
@task
def preprocess(in_file: str, target_dir: str):
time.sleep(1)
return f"{target_dir}-{in_file}"
@task
def segment(in_file: str, target_dir: str):
return f"{target_dir}-{in_file}"
@task
def mask_bg(in_file: str, mask: str, target_dir: str):
return f"{target_dir}-mask-{in_file}"
@flow(name="Background Masking", task_runner=DaskTaskRunner())
def background_masking():
files = list_files()
preprocessed = []
segmented = []
masked = []
for f in files.wait().result():
preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
segmented.append(segment(preprocessed[-1], target_dir="/segmented/"))
masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/"))
if __name__ == "__main__":
result = background_masking()
If I run this locally (no remote file storage) with normal sqlite-db the processing does not finish. With SequentialTaskRunner
or ConcurrentTaskRunner
the script completes.Ievgenii Martynenko
04/08/2022, 9:53 AMGaurav kumar
04/14/2022, 7:43 AMTim Wright
04/19/2022, 9:16 PMdict_result = some_task()
some_other_task(**dict_result)
Is this doable? Is there a more Prefect-y way to accomplish this without creating a Parameter for eahc element of the dictionary? I ultimately want to pass as an input a list of dictionaries which I can then operate over using Prefect's mapping, so being able to define the collection of arguments together is important.Jon Ruhnke
04/21/2022, 4:36 PMKevin Kho
04/21/2022, 11:01 PMAnna Geller
04/22/2022, 4:54 PMLaurie Hindes
04/22/2022, 4:57 PMKyle McChesney
04/22/2022, 5:08 PMJeremiah
04/22/2022, 5:33 PMKevin Mullins
04/22/2022, 8:40 PMGeorge Coyne
04/22/2022, 9:34 PMGustavo Puma
04/26/2022, 2:01 PMNoah Holm
04/28/2022, 6:54 AMNoah Holm
04/28/2022, 6:54 AMAnna Geller
04/28/2022, 11:09 AMJeremiah
04/28/2022, 1:23 PM