• Hilary Roberts

    Hilary Roberts

    1 year ago
    Why are our flow limited to a concurrency of 2 tasks? We are using prefect cloud with a Kubernetes Agent running 2 workers. For some reason it only runs 2 tasks concurrently. I have tried setting the executor to DaskExecutor() and LocalDaskExecutor(). The behaviour stays the same The tasks I am running in this example are just waiting for 10 seconds, so resource constraints can’t really be the issue. I assume it’s limiting it to 1 concurrent task per Kubernetes worker node, but haven’t been able to confirm that. Anyone know any possible reasons for this?
    Hilary Roberts
    Kevin Kho
    10 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    1 year ago
    Hey there. We'd like to preserve all the prefect logs as files in a file system. How can we get all prefect logs to be written to files as well? Thanks!
    Pedro Machado
    Kevin Kho
    4 replies
    Copy to Clipboard
  • d

    Dan Zhao

    1 year ago
    Hi all, a stupid question. I was deploying a Prefect Server on a remote machine - are there any settings needed to be able to view the UI from the local machine?
    d
    Kyle McChesney
    +1
    4 replies
    Copy to Clipboard
  • Anze Kravanja

    Anze Kravanja

    1 year ago
    Hi all, I have a quick question about state handlers. I’ve created one for the use case if the flow ends up in a failed state, I want to grab all the error details from underlying tasks and send an email. What I’m doing when testing locally is:
    for t in flow.tasks:
            tr = state.result.get(t, None)
            if not tr: continue
            <http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
            if tr.is_failed():
                err_params['tasks'].append(OrderedDict({
                    'taskName': t.name,
                    'errorMessage': tr.message,
                    'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
                }))
    Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info. This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary. While previously locally, I found each tasks result there. I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}. Any ideas what I might be doing wrong?
    Anze Kravanja
    Kevin Kho
    +1
    23 replies
    Copy to Clipboard
  • Kyle McChesney

    Kyle McChesney

    1 year ago
    I am trying to run a flow via an ECS agent. The agent is up and running, but when I trigger a flow I get the following error:
    Parameter validation failed:
    Unknown parameter in input: "null", must be one of: capacityProviderStrategy, cluster, count, enableECSManagedTags, enableExecuteCommand, group, launchType, networkConfiguration, overrides, placementConstraints, placementStrategy, platformVersion, propagateTags, referenceId, startedBy, tags, taskDefinition
    This is coming from the boto3 call triggered here: https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/agent/ecs/agent.py#L320 My agent is being started like so (with prefecthq/prefect:0.14.13-python3.8, maybe this is old?):
    prefect agent ecs start --run-task-kwargs s3://$bucket/run_task_kwargs.yml -a <https://my-api:4200/graphql>
    My run task kwargs yaml has
    cluster
    ,
    launchType
    and
    networkConfiguration
    . I also attempted to provide values in the run config via the UI, but it did not seem to change much (added CPU, memory, task and exec role). Any help is much appreciated!
    Kyle McChesney
    Kevin Kho
    24 replies
    Copy to Clipboard
  • chicago-joe

    chicago-joe

    1 year ago
    hey all, quick question, is it possible to configure task output names for tasks with nout > 1 ?
    chicago-joe
    Kevin Kho
    6 replies
    Copy to Clipboard
  • Tim Enders

    Tim Enders

    1 year ago
    How do I deal with the following error?
    [2021-07-29 12:41:09-0500] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
    Tim Enders
    Kevin Kho
    23 replies
    Copy to Clipboard
  • p

    Philip MacMenamin

    1 year ago
    question about memory management - I have a flow along the lines of:
    with Flow() as f:
       big_obj = gen_obj1()
       big_obj2 = gen_obj2(big_obj) #never need to use big_obj again, want to reclaim memory
       big_obj3 = gen_obj3(big_obj2)    # finished with big_obj2
       ...etc
    what's the canonical way of freeing up these objects within the flow, once they've been consumed and are no longer needed?
    p
    Kevin Kho
    4 replies
    Copy to Clipboard
  • YD

    YD

    1 year ago
    Scheduling issue... Running basic task
    hello_world.py
    the Prefect.io server runs on a VM on AWS, with python 3.6 with latest prefect python package agent is running: agent LocalAgent LAST QUERY11:19am | 1 seconds ago CORE VERSION 0.15.3 when running
    hello_world.py
    using python3.6, the schedule does not work when running using python3.8, the schedule works, but I get errors Why does the schedule does not work with the python3.6 ? (It used to work.. not sure what changed) I tried pip install prefect --upgrade (both on laptop and VM) killed the old agent (on lap top) prefect backend server (on laptop) prefect agent local start --label "<label>" (on laptop) this did not help
    YD
    Kevin Kho
    35 replies
    Copy to Clipboard
  • Leon Kozlowski

    Leon Kozlowski

    1 year ago
    What is the typical prefect version upgrade strategy for a cloud KubernetsAgent?
    Leon Kozlowski
    Kevin Kho
    2 replies
    Copy to Clipboard