https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • v

    Vaibhav Ariyur

    01/05/2022, 4:26 PM
    Hi all, in Prefect is the following kind of dynamic task generation possible: user passes in the following parameter: [0,1,[2,3],0,[1,2,3]] and in doing so dynamically creates the following flow: task0 --> task1 --> parallel(task2,task3) --> task0 --> parallel(task1,task2,task3) I know dynamic mapping can take the user input and dynamically create tasks that are in parallel, but is this possible too?
    k
    • 2
    • 1
  • m

    Margaret Walter

    01/06/2022, 5:01 PM
    Hey guys! Is there a way to register a flow via a call to the server/graphql endpoint without installing prefect on the local machine? I'm trying to register flows during the build step of a code deployment pipeline (we have one pipeline for the server on EKS and one for flow/model deployment, so the server should already be set up by the time pipeline #2 is registering the flow), and I'd like to minimize the packages that need to be installed during build šŸ˜….
    k
    • 2
    • 5
  • c

    Corris Randall

    01/06/2022, 6:26 PM
    So, I have prefect server running on some dev instances, and a uat/qa instance, and then I have production using prefect cloud. I want ALL logging to go to a graylog server via graypy. I've found that when I run the flow via the command line in the docker container, it sends all the logging to my graylog server. however, when I run the flow from the UI ( server or cloud ), it doesn't set my handler. Where's the appropriate place to put this:
    k
    • 2
    • 7
  • e

    Elliot Oram

    01/07/2022, 12:21 PM
    Hi all, I am trying to setup a system where I can have prefect agents on different machines from the prefect server. Right now I have a prefect server on machine A and prefect agent on machine B. I can register the agent and it shows up in the UI no problem. When it comes to running flows however, there is no issue. Before any of the tasks start to execute I get the error in the screenshot:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/home/ubuntu/'")
    The agent is running in a python virtual environment (I have also tried this without the venv and the result is the same) the agent is started with the following command:
    prefect agent local start --api <http://my-server.com:4200>
    Any ideas on what might be the cause, or how to start debugging this?
    a
    • 2
    • 3
  • p

    Pedro Martins

    01/07/2022, 1:30 PM
    Hey there! What is the difference between using the
    client.create_flow_run(flow_id, parameters)
    and
    flow.run(parameters)
    . Are they equivalent? Would the
    flow.run
    connect to the client and run the latest version of the flow?
    a
    • 2
    • 2
  • c

    Carrie Bohinc

    01/07/2022, 5:44 PM
    Hi all. I am trying to install the prefect server helm chart and am running into an issue. I am following along with the tutorial here:

    https://www.youtube.com/watch?v=EwsMecjSYEU&amp;t=485sā–¾

    and have made it to the point of installing the chart (which completes without error), however my graphql pod cannot start because it can't connect to the postgres db. I am not trying to connect to an external db (just testing atm). When I look at my pods I do not see that a postgres pod was started, but I can see the
    prefect-server-postgres
    and the
    prefect-server-postgres-headless
    services running. I was under the impression that the helm chart would automatically start a postgres instance to use if you weren't connecting to an outside db. Did I miss a step somewhere? Any guidance is appreciated.
    m
    • 2
    • 2
  • c

    Cristian Toma

    01/11/2022, 8:14 AM
    Hi all, I'm running a flow in a k8s environment and created custom
    state_handler
    for a flow. When running locally in the
    new_state
    object the
    result
    property has values in it, but when I run it on the cluster, the
    result
    property is empty. Does someone know why is that empty, and how can I get the errors of a flow run in the state handler on a cluster env? Thanks.
    a
    • 2
    • 2
  • t

    Tony Liberato

    01/11/2022, 3:03 PM
    We do a lot of downloading and un-zipping of zip files. Does anyone have any tricks or methods for doing this in Prefect server? I think the challenge is the unzip process.
    k
    • 2
    • 3
  • w

    Will Skelton

    01/12/2022, 4:30 PM
    I'm trying to get the UI running locally to enable a better view into what is happening in prefect during my POC/development. When running "prefect server start" I get the following error. I do have a local install of postgres already that could be causing this. Any thoughts here? I didn't see anything in the troubleshooting docs or the github discussions area.
    k
    • 2
    • 10
  • s

    StƩphan Taljaard

    01/13/2022, 5:17 AM
    Hi. Why does it sometimes happen that I have flows that have been scheduled (or something), but they have no state?
    k
    • 2
    • 10
  • s

    Sam Werbalowsky

    01/13/2022, 9:59 PM
    Anyone have issues with Git storage and a bunch of gobbedigook being appended to the repo? This is a new one, I’m fiarly sure that we haven’t updated anything on registration on our end: I checked by importing the same version of our helper library:
    <https://data:XXXXXXXXXXXX@https>://<my-repo>/data/scratch-data.git
    It’s unexpected to see that
    <https://data>…@
    portion
    k
    • 2
    • 31
  • d

    Denys Volokh

    01/14/2022, 2:25 PM
    Hi, I am not able to start server because of ā€œNo space left on deviceā€ issue for postgres service.
    k
    • 2
    • 2
  • b

    Bradley Hurley

    01/14/2022, 3:22 PM
    Hi - I've encountered an issue, but I'm not sure its a bug because my situation is unique. Because of the upcoming license changes to Docker Desktop I have configured my local machine to use the Docker daemon on a VM thats already running on my local machine to support microk8s. When attempting to run a docker agent I encountered issues related to
    host.docker.internal
    . I dug into the source and realize the IP only gets set properly if the
    sys.platform
    starts with
    linux
    . My host machine is OSX, but docker is running on Linux. Here is my prefect agent start command
    prefect agent docker start -b <tcp://192.168.64.2:2375> -l bh-docker --network prefect-server --log-level DEBUG --show-flow-logs --no-pull
    k
    • 2
    • 3
  • a

    Ahmed Ezzat

    01/16/2022, 4:34 AM
    Hi - I'm using Prefect w/ Dask on kubernetes and I'm seeing unusual restarts without any clear reason. that's said this usually happens when some of dask workers are not ready or stuck in a pending state e.g. 4 workers, only 2 are active and the other 2 are pending due to CPU, memory .. etc, is there something I can do? I don't mind tasks being restarted however most of them are time expensive. I know about input/output caching but I'm thinking more of caching per task run?
    task = mytask(...)
    other1 = other1.map(task)
    other2.map(other1)
    k
    • 2
    • 6
  • p

    Pierre Monico

    01/17/2022, 10:09 AM
    Hi For some reason for a week I am getting task runs that stay stuck for several days. Even when hitting the cancel button in the UI nothing helps and I have to set the state to ā€œcancelledā€ manually. • Is there any way to set a time out for tasks globally? • Is there a way to also avoid retrying task runs that are late? Basically none of my flows are too critical and I simply want to cancel them if they stay stuck too long and not retry. (the problem now is that after a few tasks get stuck at some point I run out of memory and the server crashes)
    a
    • 2
    • 10
  • s

    Samuel James

    01/17/2022, 11:24 AM
    Hi, looking to install Prefect backend server locally. Is there a release for M1 chips as I am currently having some installation issues
    a
    • 2
    • 14
  • s

    Santiago Gonzalez

    01/17/2022, 4:54 PM
    Hi, I have some troubles running a flow. This is the context. I have a repo that contains all the flows, and another repo that keep shared code between them (
    ec2_instances
    and others). In
    ec2_instances.py
    there are 3 task methods, and in this way it works. However, since I made a refactor in that file in order to add some behavior I needed, it’s failing. I added some non-task methods that are going to be invoked in the existing task methods. The error that is being thrown is:
    Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n  AttributeError("Can\'t get attribute \'get_boto_client_with_auth\' on <module \'...ec2_instances\' from \'/usr/local/lib/python3.7/site-packages/automation_library/ec2_instances.py\'>")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - cloudpickle: (flow built with \'1.6.0\', currently running with \'2.0.0\')\n  - python: (flow built with \'3.7.9\', currently running with \'3.7.12\')')
    Why is it failing only when I add non-task methods? Does it make any sense?
    a
    • 2
    • 9
  • m

    Michael Moscater

    01/17/2022, 7:37 PM
    Hello, I'm trying to set up prefect server (single node) on an amazon ec2 t4g instance, and I'm running into an issue getting the server to start. I'm running into "standard_init_linux.go:228: exec user process caused: exec format error", Which I assume the docker image is trying to deploy from a different architecture, anyone come across this or recommend how to resolve?
    a
    • 2
    • 2
  • c

    CƓme Arvis

    01/18/2022, 11:57 AM
    Hello! We regularly, and for an hour constantly, have the following unexpected error: (full logs in the thread)
    Unexpected error: AttributeError("partially initialized module 'prefect' has no attribute 'schedules' (most likely due to a circular import)")
    during our flows executions. This seems to occur randomly between tasks executions. Is there some known issue about it? Thanks!
    a
    • 2
    • 9
  • p

    Pierre Monico

    01/18/2022, 1:02 PM
    Does anyone have experience with securing the apollo API (ingress) when deploying Server with the Helm chart. I could easily secure it with basic auth or similar but then I don’t get how you’re supposed to authenticate from the UI since you can only pass a URL. Do I need to implement some sort of API key mechanism?
    a
    d
    t
    • 4
    • 11
  • t

    Thomas Mignon

    01/18/2022, 1:42 PM
    Hi guys, I've installer docker and docker-compose :
    (base)  thomas@Ifremer ī‚° ~ ī‚° docker -v   
    Docker version 20.10.8, build 3967b7d
    (base)  thomas@Ifremer ī‚° ~ ī‚° docker-compose --version         
    docker-compose version 1.25.0, build unknown
    I'm trying to start prefect with the command
    prefect server start
    but i obtain this error :
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    
    If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
    Exception caught; killing services (press ctrl-C to force)
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    
    If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
    Traceback (most recent call last):
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 623, in start
        subprocess.check_call(
      File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 364, in check_call
        raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/thomas/miniconda3/bin/prefect", line 10, in <module>
        sys.exit(cli())
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 664, in start
        subprocess.check_output(
      File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 411, in check_output
        return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
      File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 512, in run
        raise CalledProcessError(retcode, process.args,
    subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
    a
    p
    +2
    • 5
    • 33
  • b

    Boggdan Barrientos

    01/18/2022, 7:10 PM
    hi all! I'm new to prefect. I want to use it to orchestate Airbyte. I'm running Airbyte by docker-compose and I'm following the guide from Airbyte web. I want to install prefect inside a ECS Ubuntu 20.04.3 t3.large instance. I'm using Python 3.8.10. https://airbyte.com/recipes/elt-pipeline-prefect-airbyte-dbt I'm follow the guide and read the getting started in prefect documentation. I ran the following commands.
    pip install prefect
    Inside python idle I was able to run successfully:
    import prefect
    from prefect import task, Flow
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello world!")
    
    with Flow("hello-flow") as flow:
        hello_task()
    
    flow.run()
    But in the cli, I try to run and nothing. I try to run in the cli
    prefect version
    and results.
    ubuntu@ip-20-0-0-33:~$ prefect version
    prefect: command not found
    I dont know what I'm doing wrong. I would appreciate your help. Thanks.
    k
    a
    a
    • 4
    • 10
  • w

    Will Skelton

    01/19/2022, 2:19 AM
    Can someone point me towards some additional documentation (examples are great!) on how to use the prefect.tasks.azure.blobstorage.BlobStorageUpload Task? I feel like I'm fairly close, but I'm not sure exactly what I need to try and pass into it credentials wise. I have a connection string from Azure and a Key, but I don't know where to put them exactly. Many Thanks!
    k
    • 2
    • 6
  • p

    Paulo Benatto

    01/19/2022, 3:04 PM
    Hi guys I have a newbie question. I want to inject my token in my LocalAgent.
    client = Client(api_token=token)
        client.register(flow, project_name=project)
        from prefect import config
    
        # this looks wrong 
        config.cloud.agent.auth_token = token
    
        LocalAgent(name=name).start()
    using command line to start agent works fine: • prefect auth login -t TOKEN • prefect agent local start I would like to run the agent from python and inject the token in my agent šŸ¤” thanks
    k
    • 2
    • 8
  • m

    Maximilian Rausch

    01/20/2022, 3:26 AM
    I can't find any documentation for what happens with late/missed runs. Say for example a flow is scheduled for 1x per day at 9am and the system crashes. Eventually I get the machine and agent back online 3 days later. What happens to all the previously scheduled runs?
    k
    • 2
    • 5
  • m

    Moss Ebeling

    01/20/2022, 6:33 AM
    Hi there, We've recently begun using Prefect Core and Prefect Server and have had positive initial results with writing flows but have hit some pain points when coming to deployments out to Prefect. When writing our flows, we make quite heavy usage of shared functions and tasks between different flows. For example helpers that fetch some common datasets or utilities for data munging. These are imported from a package that contains the common tasks and the flow definitions.
    from common_etl.data_sources import fetch_sales
    from common_etl.util import enrich_with_user_details
    
    # in flows/agg_sales.py
    @task
    def aggregate_sales():
        sales = fetch_sales()
        enriched_sales = enrich_with_user_details(sales)
    
    # in flows/publish_sales_report.py
    @task
    def publish_to_wiki():
        sales = fetch_sales()
        sales.to_wiki()
    We are using docker storage when deploying flows and copy the relevant package into the docker image and pip install it using the final commands options. Doing this means that we're able to deploy and run using a
    LocalExecutor
    that has the correct version of our package in its execution environment. However, since our jobs are large and can exhaust the resources on a single host, we're trying to use the
    DaskExecutor
    instead, and point to a multi-host cluster. At this point we see failures because the dask workers in that cluster do not have this package installed in their execution environments. If we were to try to install it in their environments it would require stopping dask workers, installing the package in their environment, then restarting them. This would also seem to restrict all deployed flows to be using the exact same release of the common package (since the dask workers can only have one version installed?). Has anyone had similar experience dealing with docker storage and dask executors or suggestions on this design?
    k
    • 2
    • 4
  • r

    Raviraj Dixit

    01/20/2022, 7:15 AM
    Hi, I am trying to run prefect using docker-Kubernetes. Can we pass toml file instead of the package list to docker storage?
    k
    • 2
    • 7
  • v

    Vaibhav Shetye

    01/20/2022, 12:22 PM
    Is there a way to load balance workflow execution between agents? Assume I have multiple agents ( one per server) and I wish to direct execution based on server load )
    a
    • 2
    • 2
  • g

    Gian Piero Izzo

    01/20/2022, 4:22 PM
    Hello, I would like to use an open git repository, with no password. I just configure it in this way:
    storage = Git(
        repo="org/repo",
        flow_path="flows/my_flow.py"
    )
    The problem is that the docker agent is not able to get the repository, it gives the error: Failed to load and execute Flow's environment: NotGitRepository('No git repository was found at https://@github.com/org/repo.git') is there any way to avoid this "@" ? Thanks in advance
    k
    • 2
    • 10
  • c

    Christopher Chong Tau Teng

    01/21/2022, 2:41 AM
    Hi there, apart from GUI, how do I remove/delete one/multiple flows? e.g. via CLI, Python etc.
    k
    a
    • 3
    • 6
Powered by Linen
Title
c

Christopher Chong Tau Teng

01/21/2022, 2:41 AM
Hi there, apart from GUI, how do I remove/delete one/multiple flows? e.g. via CLI, Python etc.
k

Kevin Kho

01/21/2022, 2:44 AM
You need to use the GraphQL API like:
mutation {
  delete_flow(input:{flow_id: "171bb895-f94e-4d9d-9e86-7370b743a285"}) {
    success
  }
}
c

Christopher Chong Tau Teng

01/21/2022, 6:43 AM
@Kevin Kho apart from GraphQL, is there any other way? specifically way that can be scripted
I saw that we can use Python script to interact with GraphQL https://docs.prefect.io/orchestration/concepts/api.html#client, is there a way to delete flow just by specifying its name but not its id?
a

Anna Geller

01/21/2022, 11:41 AM
You could first query for flows and their IDs, and use it as input to the delete_flow mutation. You can definitely script that in Python but there is no way to bulk delete things from the CLI or UI, other than perhaps delete an entire project and all flows within a project.
c

Christopher Chong Tau Teng

01/24/2022, 4:06 AM
@Anna Geller so there re no CLI way to delete flow?
k

Kevin Kho

01/24/2022, 4:11 AM
There is none. Just projects. We’d definitely welcome a PR for that though
View count: 1