• d

    Daniel Davee

    1 year ago
    Does the agent have to be running on the cluster?
  • Dean Magee

    Dean Magee

    1 year ago
    Hi There, Im trying to use a state handler to send an error message to MS Teams when a task fails...
    def alert_failed(obj, old_state, new_state):
    if new_state.is_failed():
    myTeamsMessage = pymsteams.connectorcard(os.getenv("MSTEAMS_WEBHOOK_URL"))
    myTeamsMessage.title("An error has occurred!")
    Im trying to grab the text that Python spits out as an error message and include that in my MS Teams message. Any idea how to do that?
    Dean Magee
    1 replies
    Copy to Clipboard
  • j

    jaehoon

    1 year ago
    i wanna pass result of task to flow as parameter, but error ocurred like this flow_run_id = client.create_flow_run( File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 1108, in create_flow_run res = self.graphql(create_mutation, variables=dict(input=inputs)) File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 302, in graphql params=dict(query=parse_graphql(query), variables=json.dumps(variables)), File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json_init_.py", line 231, in dumps return _default_encoder.encode(obj) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 257, in iterencode return _iterencode(o, 0) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type TaskMetaclass is not JSON serializable help me!
    j
    Kevin Kho
    7 replies
    Copy to Clipboard
  • r

    Romain

    1 year ago
    Hi folks, Have an issue with running a flow using a Kubernetes Agent, and a Module storage. So here is what I do. The agent docker image is based on prefecthq/prefect:0.14.19, and in my dockerfile, I have the following folder hierarchy:
    /A/
      B/
         flows/
             my_flow.py
    in my_flow.py, imagine something like that:
    def get_flow():
        with Flow('my_flow', storage=Module("B.flows.my_flow:get_flow")) as flow:
            ....
        return flow
    In the dockerfile, I ensure that the
    PYTHONPATH
    env var holds the folder
    A
    so that I can import
    <http://B.flows.my|B.flows.my>_flow
    setting this at the end of the DockerFile:
    ENV PYTHONPATH "${PYTHONPATH}:/A"
    After that I register this flow with a
    KubernetesRun
    and a DaskExecutor:
    flow.run_config = KubernetesRun()
    flow.executor = DaskExecutor()
    flow.register(project_name='my_project',
                  idempotency_key=flow.serialized_hash())
    Then from a prefect server, I trigger the flow run, but I get the following error:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'A'")
    I don't really get why my module is not found. I have tested a local deployment using docker-compose with a local agent (running in the compose stack), and it was working fine. So I am missing something here. Any ideas?
    r
    Kevin Kho
    4 replies
    Copy to Clipboard
  • d

    Daniel Davee

    1 year ago
    I'm being told by my Kubernetes engineer that he can not pull
    image: prefecthq/prefect:latest
    on to GKE, can prefect be ran on GKE
    d
    Kevin Kho
    +1
    39 replies
    Copy to Clipboard
  • d

    David Glaister

    1 year ago
    Hi, I'm trying create a local Prefect system to present to my employer. Using a Windows 10 VM I can get things working without issue. To create the demo I need to interact with other systems on the company network... which is where the problems start. The web UI only shows buttons, which do not respond. These are the steps I take to reach this point: • Start Docker • Cmd prefect backend server • Cmd prefect server start • Browser (Edge & Chrome) localhost:8080 The debug from the browser gives the error: Uncaught (in promise) Error: passed invalid or empty tenant object My company laptop connects uses a proxy, I'm not sure if this is related to the issue
    d
    Kevin Kho
    4 replies
    Copy to Clipboard
  • Austin Mackillop

    Austin Mackillop

    1 year ago
    Hello, is there a way to have the prefect agent execute a flow in a single threaded manor? I converted an existing ml model training script that makes use of some datatypes that cannot be pickled. Returning these types from tasks works fine when executing the flow locally in a single thread but using the DaskExecutor or running the flow on the agent causes the flow to fail.
    Austin Mackillop
    Kevin Kho
    8 replies
    Copy to Clipboard
  • Peter Roelants

    Peter Roelants

    1 year ago
    Hi Prefect, Is there a way to limit parallel flows in Prefect Server? For example I have two flows: • flow_B Resource intensive, long-running flow that is run as a dependent flow for flow_A flow_A : ◦ Flow that is scheduled to run every
    t
    minutes ◦ Calls flow_B
    x
    times using
    StartFlowRun
    ◦ Run with
    LocalDaskExecutor
    to limit parallelism to max
    y
    runs at a time. For one scheduled run, the dependent flow flow_B is limited to run max
    y
    at a time. Now, I noticed that for a single run of flow_A, flow_B is indeed limited to a parallelism of
    y
    runs at one time. However, when a previous flow_A (and dependent flow_Bs) are still running), and a new flow_A, with new dependent flow_Bs are scheduled than more than
    y
    flow_Bs can run at the same time. For example with parallelism y=2: • At time 1 there will be 2 flow_Bs runing.
    Run flow_A 1:
      |--dependent flow_B 1: running
      |--dependent flow_B 2: running
      |--dependent flow_B 3: waiting
    • At time 2 there will be 4 flow_Bs runing.
    Running flow_A-1:
      |--dependent flow_B-1: finished
      |--dependent flow_B-2: running
      |--dependent flow_B-3: running
    
    Run flow_A-2:
      |--dependent flow_B-4: running
      |--dependent flow_B-5: running
      |--dependent flow_B-6: waiting
    Is it possible to limit the parallelism of flow_B to max
    y
    over all scheduled runs?
    Peter Roelants
    Kevin Kho
    5 replies
    Copy to Clipboard
  • j

    Jeff Williams

    1 year ago
    Hello all. I have an odd situation that I can't seem to figure out. The objective is to capture information about flow and task runs and save them off to some "external" storage. The environment is ultimately going to be on a Google Cloud instance, but I am not there yet. I have worked through things locally, meaning I have my test script written to run locally. I am taking the next step, which is to register the flow with a local agent (local infrastructure - no problems there) and then run it on a local agent. All of that seems to work fine, for the most part. I am using a custom terminal state handler to capture the final states of the tasks and flow that has just finished executing. The specific issue I am facing right now is that the state.result dictionary is populated when run locally but it is not when it is run using a local agent. Specifically, if I do
    len(state.result.keys())
    I get a non-zero value locally but get zero when run on the agent. Any ideas as to why?
    j
    Kevin Kho
    +1
    11 replies
    Copy to Clipboard
  • Mariusz Olszewski

    Mariusz Olszewski

    1 year ago
    hi, is there a chance to set termination of task/ flow in some X hours time?
    Mariusz Olszewski
    Kevin Kho
    2 replies
    Copy to Clipboard