https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marcos San Miguel

    11/02/2021, 9:41 AM
    Hello, I'm trying to use the prefect nomad agent instead of the k8s agent. To do that I added the NomadAgent class inside Prefect/src/prefect/agent/nomad and changed a few thing from the code to make it work. I copied the class from the commit I found here: https://github.com/PrefectHQ/prefect/pull/1341/commits/adcb3edad21d36a95d0d9c19041b5774e995b1ae. I made it work and I can run prefect flows now as nomad jobs. However, although I can see in the flow logs that the tasks are running and the flow state is changed to success, the UI shows that the state is stucked in submitted (also appears in submitted with CLI) and the flow is continuously being rescheduled by a Lazarus process (I already tried disabling Lazarus, but the flow state doesn't change to success) . In the attached documents you can see the flow logs and the flow in the UI. Thank you so much for your help!
    logs.json
    a
    m
    • 3
    • 17
  • k

    Klemen Strojan

    11/02/2021, 11:14 AM
    Hey all - has anybody had any problems with inviting new team members to the Prefect Cloud? Our users don’t receive anything when invited, tested with different email addresses on different domains. I would appreciate any advice.
    a
    n
    • 3
    • 5
  • o

    Ondřej Melichar

    11/02/2021, 11:32 AM
    If i shutdown prefect server and then start it up again, the previously running agents are not responding, do i have to start them up again after each restart of prefect?
    a
    k
    • 3
    • 3
  • b

    Billy McMonagle

    11/02/2021, 2:28 PM
    If I wanted to log flow run information in a custom way, is there a built-in or supported way to do this?
    k
    • 2
    • 9
  • h

    haf

    11/02/2021, 2:32 PM
    I thought I found a problem, I had not. Sorry. 🙂
    k
    • 2
    • 2
  • b

    Benjamin Bonhomme

    11/02/2021, 3:20 PM
    Hey guys, I have a main flow that runs dependent flows defined as attached in the file. All the flows are actually legacy Java pipeline running in containers using ECS Fargate, so task defined in this graph will trigger the job to run on ECS fargate with all dependencies packaged within each container. The execution on this main flow runs fine but it is serialized, the point of this is to run in parallel through the DAG as each task is an independent ECS task running. Is there something I am missing to execute the tasks/flows in parallel leveraging ECS?
    k
    a
    • 3
    • 9
  • v

    Vamsi Reddy

    11/02/2021, 4:29 PM
    Hi Everyone, is it possible to share variables between the task and the state handlers. i want to send slack notifications based on my config parameter. the config is a dict which i want to use within the state handler post_to_slack function to send custom notifications.
    k
    • 2
    • 1
  • a

    ale

    11/02/2021, 5:27 PM
    Hi folks 😒imple_smile: We are using ECSRun + Docker storage. We’d like to provide the task definition ARN, but we get the following error:
    Cannot provide `task_definition_arn` when using `Docker` storage
    Looking at the code at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py it seems a by design choice. What are the alternatives for running a flow on ECS while providing the task definition ARN?
    👀 1
    a
    k
    • 3
    • 15
  • g

    Greg Adams

    11/02/2021, 7:23 PM
    Question: I’m setting my flow to use the LocalDaskExecutor and deploying to a Docker agent. When I use a docker agent on my laptop it parallelizes just fine, but when I try to run on a VM (16 CPUs, Google Compute Engine) it’s only using a single core, and only executing one task at a time. Not sure if I’m setting it up wrong, any ideas what might be happening?
    a
    k
    • 3
    • 8
  • w

    Will

    11/02/2021, 8:08 PM
    Hi all, I'm using the ECS Agent and running the tasks using the [ECS run config](https://docs.prefect.io/api/latest/run_configs.html#ecsrun). I'm passing a custom task defintion to the run configuration. I've realised that the execution role is picked up from the custom task definition (this does not match the behaviour described in the docs btw; they specify that
    If not provided, the default on the agent will be used (if configured).
    ). This does not happen for the task role, which is instead passed from the agent. This behaviour is unexpected; I would have assumed when passing a custom task definition, that the roles I have defined (task role and execution role) would not be overridden by prefect. So, I've now modified naming convention for the task role for a particular flow, to test passing it via: `task_role_arn (str, optional)`: The name or full ARN for the IAM role to use for this task. If not provided, the default on the agent will be used (if configured). The role I have passed has full S3 permissions:
    "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Action": "s3:*",
                "Resource": "*"
            }
        ]
    }
    The flow itself uses the [S3Upload](https://docs.prefect.io/api/latest/tasks/aws.html#s3upload) action. It fails with
    Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    a
    k
    a
    • 4
    • 27
  • j

    Jacob Bedard

    11/03/2021, 4:52 AM
    Sorry if this has already been answered - I just couldn't find it in the docs - perhaps I've just had a really long day and can't think of the right search terms. What's the best practice for forcing a failure if a task or flow doesn't pass a custom validation I've made? For example, if the rows loaded into a destination don't match the row count that was supposed to be loaded? Stuff like that happens with Salesforce all the time and their callbacks aren't very flexible, so I like to check to see if rows loaded are what I expected.
    k
    • 2
    • 5
  • f

    Freddie

    11/03/2021, 10:36 AM
    Hi all, we're running Prefect on Heroku. We've got some flows which share state between tasks. Namely, the first task goes and partitions a set of ids by those which are still valid or not (by calling out to a 3rd party API). The second task then processes the invalid ones to determine what's happened to them. The first task returns a tuple of sets of ints. There are at most 1M ints between the two sets. At the moment it's taking multiple hours to return from the first task and mark it completed and I wondered if anyone had any advice on how to deal with this and whether this is expected behaviour?
    a
    k
    • 3
    • 27
  • a

    Architha Rao

    11/03/2021, 11:35 AM
    Hi all. I am completely new to prefect and I am facing an issue. I have a prefect docker agent deployed. A flow attached to it shall be triggered when an input is passed and the flow shall be immediately set to run state. It's been working perfectly fine until yesterday I observed the flows are in the scheduled state infinitely. SO every new input keeps adding new fllow runs but all into an infinitely scheduled state. Any inputs would be helpful. Thanks in advance.
    👀 1
    a
    • 2
    • 6
  • b

    Brett Naul

    11/03/2021, 12:48 PM
    been seeing this sporadically over the last 24 hours (maybe 1 in 10 flows?), any idea what might be up? I don't see anything about
    edge_on_conflict
    in the
    prefect
    or
    server
    repos
    File "/src/util/prefect.py", line 575, in register_flow
                                                 flow_id = client.register(flow, project_name, idempotency_key=flow.serialized_hash())
                                               File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1227, in register
                                                 self.graphql(
                                               File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 569, in graphql
                                                 raise ClientError(result["errors"])
                                             prefect.exceptions.ClientError: [***'path': ['register_edges'], 'message': '[***\'extensions\': ***\'path\': \'$.variableValues\', \'code\': \'validation-failed\'***, \'message\': "no such type exists in the schema: \'edge_on_conflict\'"***]', 'extensions': ***'code': 'INTERNAL_SERVER_ERROR'***]
    a
    m
    • 3
    • 13
  • l

    Lukas N.

    11/03/2021, 1:23 PM
    Hello, we're observing an issue with Prefect Artifacts and mapped tasks after update to 0.15.6, any ideas how to resolve 🙏 ?
    k
    • 2
    • 10
  • t

    Thomas Nyegaard-Signori

    11/03/2021, 1:51 PM
    Hello community, I am chasing an intermittent bug, possibly related more to Kubernetes than to Prefect. Currently, a lot of our Prefect flows map out to several k8s jobs and do some moderately heavy computation for ~1 hour and then shut down. Our load is very burst-y, so we might scale from 1-2 nodes to ~50 down to 1-2 nodes again within the span of an hour. We are using Azure Kubernetes Service and I have tried being quite aggressive with the k8s autoscaler, setting the
    scale-down-delay-after-add
    to 1m and similarly the
    scale-down-unneeded-time
    to 1m. The issues that we are facing is sometimes the task pods fail, seemingly without reason, and logs are quite unhelpful. My hunch is that is has something to do with scaling of the cluster, potentially destroying pods/losing networking between flow and task pod in the process? We are already setting the
    <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
    on all pods, so eviction shouldnt be the issue. Have anyone else had any experiences with k8s autoscaler settings leading to weird, intermittent task failures?
    k
    m
    • 3
    • 5
  • m

    Maxwell Varlack

    11/03/2021, 3:16 PM
    Hello everyone
    👋🏽 1
    👋 4
    k
    • 2
    • 2
  • s

    Steve s

    11/03/2021, 3:24 PM
    Hey all, I'm having a strange problem that has me stumped. A flow that I run daily has just started failing. Basically the flow just launches several sub-flows using
    create_flow_run
    . Today it started giving me the traceback pasted below. I commented out everything in the flow except for the very first
    create_flow_run
    and it's still giving me this error (see thread): Any ideas?
    k
    m
    • 3
    • 16
  • j

    Jake Place

    11/03/2021, 4:21 PM
    Does anyone know if it is possible to pass
    StartFlowRun
    storage information? Or a better way to store flows that need to operate in multiple environments?
    k
    • 2
    • 9
  • w

    Will

    11/03/2021, 4:42 PM
    Not a question but I enjoyed finding this little easter egg when having a look through the Prefect source 😆 https://github.com/PrefectHQ/prefect/blob/e699fce534f77106e52dda1f0f0b23a3f8bcdf81/src/prefect/core/flow.py#L1738
    😂 5
    k
    • 2
    • 1
  • s

    saml

    11/03/2021, 5:56 PM
    I’m trying to keep a large project organized and I’m wondering if I’m thinking about it the wrong way. I have two series of tasks currently defined as flows that are defined in seperate .py files. I was hoping to create a third flow that executed the first two and then performed a few more tasks. Is it better to think of those two upstream files as flows, or should I just organize each of those files by having them end in a final task that has all the upstream tasks as “upstream tasks”, and then import the terminal task into the master flow? (hopefully thats clear)
    k
    • 2
    • 28
  • h

    Harish

    11/03/2021, 7:04 PM
    Hi team, whom should I contact to get one of our teammates prefect account unlocked after it got locked due to too many failed login attempts?
    ✅ 1
    k
    n
    • 3
    • 4
  • k

    Khuyen Tran

    11/03/2021, 9:17 PM
    So I saw that we can use a dictionary of parameters as the argument of `flow.run`:
    flow.run(parameters={"x": 8, "y": 9})
    I wonder if there is a way we can do that with
    flow.register
    as well? Something similar to this:
    flow.register(project_name="My Project", parameters={"x": 8, "y": 9})
    a
    • 2
    • 3
  • m

    m

    11/03/2021, 11:49 PM
    I use mlflow with prefect, and I run an mlflow experiment inner a prefect task, I would how can I get logging from the mlflow run experiment to the prefect UI? I juste see in the documentation how log in the task cde, but not among a fonction inner the task there my task :
    @task(log_stdout=True)
    def run_mlflow(project_path, experiment):
        mlflow.projects.run(
            project_path, experiment_name=experiment,
        )
    and there my mlflow experiment where I would put prefect logging :
    initialize(config_path="conf", job_name="gojob")
    cfg = compose(config_name="config")
    print(OmegaConf.to_yaml(cfg))
    traking = cfg["var"]["MLFLOW_TRACKING_URI"]
    params = {}
    project_path = cfg["project_path"]
    experiment = cfg["experiment"]
    mlflow.set_tracking_uri(traking)
    mlflow.set_experiment(experiment)
    
    # print(subprocess.run(["ls"]))
    with mlflow.start_run(nested=True):
    
        set_env(cfg)
    
        get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
    
        train = mlflow.run(project_path, "train", experiment_name=experiment)
    the logging from print() appear just in the task and not in the mlflow experiment Thank you in advance
    m
    k
    a
    • 4
    • 52
  • k

    Karol

    11/04/2021, 4:33 AM
    Hi, I want to ask if Orion still uses two types of storage:
    script
    and
    pickle
    based storage as described here: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage
    k
    • 2
    • 11
  • a

    AsH

    11/04/2021, 8:18 AM
    Hi, I'm new to prefect. I've been trying to setup Prefect on my local machine and I'm having a difficulty understanding how to start Prefect CLI? I'm trying to run basic commands like "prefect backend cloud" but it is not recognising the "Prefect" command to begin with. Is there a documentation which I can refer to setup Prefect CLI?
    a
    • 2
    • 2
  • a

    Aqib Fayyaz

    11/04/2021, 9:30 AM
    Has anyone used prefect with feast on GKE?
    a
    k
    • 3
    • 5
  • m

    Martin T

    11/04/2021, 1:27 PM
    Hi! Trying to restrict the memory usage of each flow by passing
    host_config
    to our
    DockerRun
    config:
    flow.storage = Docker(
            registry_url=...,
            image_name=...,
            files={...},
            env_vars={...},
            python_dependencies=[...]
            )
    
    client = docker.APIClient()
    host_config = client.create_host_config(mem_limit=12345,
                                            mem_reservation=1234
                                            )
    flow.run_config = DockerRun(host_config=host_config)
    
    flow.executor = LocalDaskExecutor()
    When registered to Cloud, this seems to be ok, since starting a Run shows the following default in Host Config:
    {
      "Memory": 12345,
      "MemoryReservation": 1234
    }
    However, this seems to have no effect on the the newly created flow containers.
    docker stats
    show that MEM USAGE keeps growing and a LIMIT that equals the total server memory.
    docker inspect <CONTAINER> | grep \"Memory[\"R]
    gives
    "Memory": 0,
                "MemoryReservation": 0,
    What are we missing here?
    ✅ 1
    a
    k
    • 3
    • 29
  • l

    Louis Auneau

    11/04/2021, 1:31 PM
    Hello! We are trying to start building our data processing pipelines on Orion. However I stumbled upon a weird behaviour on deployments which make impossible to use Orion at all. • My flow is working perfectly and I can trigger it and see it on the UI. • When I define the
    DeploymentSpec
    with the location and name and register it, the output seems OK. •
    prefect deployment ls
    does list my deployment. But: • The UI does not show my deployment. • The command
    prefect deployment inspect my_deployment
    does nothing. Let me know if you need more details or logs to investigate 🙂 ! thank you by advance and have an excellent day !
    j
    d
    • 3
    • 10
  • c

    Chris Arderne

    11/04/2021, 2:14 PM
    I'm trying to understand the best way to scale out our tasks on GCP. It seems like there are two paradigms, either an
    Agent
    or
    Executor
    (or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask. Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect). VertexAgent/VertexRun Following the blog post, we could set up a Vertex
    Agent
    (just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify
    VertexRun
    parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a
    LocalExecutor
    , so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances. DaskExecutor If we wanted Task-level parallelism, we'd need to use the
    DaskExecutor
    ? So then we'd have a Vertex instances running the
    Agent
    , and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s? Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?
    k
    • 2
    • 9
Powered by Linen
Title
c

Chris Arderne

11/04/2021, 2:14 PM
I'm trying to understand the best way to scale out our tasks on GCP. It seems like there are two paradigms, either an
Agent
or
Executor
(or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask. Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect). VertexAgent/VertexRun Following the blog post, we could set up a Vertex
Agent
(just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify
VertexRun
parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a
LocalExecutor
, so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances. DaskExecutor If we wanted Task-level parallelism, we'd need to use the
DaskExecutor
? So then we'd have a Vertex instances running the
Agent
, and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s? Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?
k

Kevin Kho

11/04/2021, 2:20 PM
Hey @Chris Arderne, so your thinking is completely right. You can set the resources on the Flow level upon registration. The default is indeed
LocalExecutor
, but we commonly see
LocalDaskExecutor
+
ECSRun
to utilize all of the cores of the machine that the flow is running on. It also helps to specify the
num_workers
.
LocalDaskExecutor
is already enough to utilize the cores of your machine. If you are using
Vertex
+
DaskExecutor
, I think it would normally be an external cluster like
DaskExecutor(cluster_address_here)
.
DaskExecutor
on the
Vertex
compute would not yield anything over the
LocalDaskExecutor
. On the last question of avoiding Dask to parallelize at the Flow level. When you do
StartFlowRun
or
create_flow_run
, it won’t wait for the subflow to complete by default. So the behavior that you see should be kicking off the Flow run, and then moving on to kicking off the next Flow runs. So yes, this spins up a bunch of Vertex jobs running simultaneously
c

Chris Arderne

11/04/2021, 3:01 PM
Great thanks @Kevin Kho! Will see if I can get something running without Dask — the parallelism will be anywhere from 1 to ~100 between runs, so
LocalDask
wouldn't help much. I haven't played much with the flow-to-flow idiom, but it seems like it should be possible to do the following four `Flow`s: 1. “Master” flow gets kicked off by
Agent
2. Starts a simple sequential flow that prepares a bunch of things and determines how parallel subsequent steps will be. 3. Result from that is passed to a
StartFlowRun
with
.map()
so a bunch of parallel
Flow
runs are kicked off on separate Vertex instances 4. Results from that are reduced back down to a single sequential
Flow
that summarises and eg inserts results into our database So all of these would use
wait=True
, but the middle Flow would be parallelised… Does that make sense/should that work?
k

Kevin Kho

11/04/2021, 3:58 PM
It’s not straightforward to get results from subflows because the process doesn’t happen on the same machine. So there is no in-memory passage of data that you normally have when they are in the same script. In 0.15.0 and above, we have 3 tasks that give you a lot more granularity that give some more flexibility:
create_flow_run
,
wait_for_flow_run
and
get_task_run_result
. You would use the
get_task_run_result
to fetch the result from another flow, but you need to know the task slug of the task with the result you want to fetch ahead of time.
So for some you can use the
wait_for_flow_run
and others you can just create and not wait.
Instead of
get_task_run_result
, you can also have those subflows persist that data somewhere and then load it in the main flow after waiting for the completion.
c

Chris Arderne

11/04/2021, 6:25 PM
Yeah passing data back is not actually too important, just knowing they're done so the next tasks can kick off.
I saw those newer tasks, not very clear what the difference is but will go play. Thanks for your help @Kevin Kho !
k

Kevin Kho

11/04/2021, 6:33 PM
StartFlowRun was too overloaded (sometimes it returns the child flow id, sometimes it returns the state if you wait)
The newer tasks are more consistent in their returns
👍 1
View count: 2