https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • t

    Tomás Emilio Silva Ebensperger

    06/11/2021, 1:32 AM
    I changed to the new api key for cloud but i get this error when registering the flows
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
    c
    9 replies · 2 participants
  • m

    Michael Hadorn

    06/11/2021, 9:06 AM
    Hi all I can not use the full cpu with mapped tasks on the prefect server using local dask executor (threads). I've a machine running the prefect server with 8 cpus (on ubuntu). There is a flow executed as a docker run with mapped cpu-intensive tasks using the local dask executor with threads. But whatever I try, the maximum cpu peak is 100% in this flow container (got this from docker stats). I know from other containers running on the same server, that the maximum is 100% per core, so in my case it is 800%. It looks like dask (or the container execution) is limited to only one cpu. Also if I look in 'top', there is only one cpu really in use. Is there any limit on the prefect server? Or is it linux, which is not using different cpu's for threads?
    m
    17 replies · 2 participants
  • l

    Lukas N.

    06/11/2021, 9:30 AM
    Hi all, I've observed different restart behaviour from ZombieKiller and the UI. I have a task that uses LOOP signal and output persisting with S3Result. My expectation is that if I do
    4/10
    iterations of the task, the result contains the output of the 4 iteration which is also the input to the 5th one. If the 5th one fails (in my case the process dies, stops sending 💓 ) the ZombieKiller restarts the task, but it ignores the result and starts from the 1st iteration! This doesn't happen if I restart it from the UI, there it correctly picks up the result and continues with 5th iteration.
    j
    2 replies · 2 participants
  • a

    Andrew Moist

    06/11/2021, 4:04 PM
    Hi, I'd like to register a flow multiple times with a different schedule and parameters for each one. Is this possible? I can see from this page in the docs it’s possible to set a start time, but no schedule.  The reason is that we're investing creating a single generic flow that can handle different source data based on input parameters.
    k
    z
    4 replies · 3 participants
  • c

    CM

    06/11/2021, 8:29 PM
    Hi All I think I am missing something obvious but when I re-register a flow after making a minor change, the changes do not get picked up.
    k
    34 replies · 2 participants
  • a

    Ayla Khan

    06/14/2021, 6:18 PM
    Has anyone gotten Dask debug logging working with
    LocalDaskExecutor
    ? I tried running this code but haven't seen any output from Dask in Prefect Cloud:
    with Flow(
            "simple_aggregate_flow",
            executor=LocalDaskExecutor(
                scheduler="processes", **{
                    "distributed.logging.distributed": "debug",
                }),
    ) as simple_aggregate_flow:
    ...
    k
    6 replies · 2 participants
  • h

    Hugo Kitano

    06/14/2021, 6:46 PM
    Hi, this is a general question, but just looking some advice. I’m trying to set off an already-registered run in a python lambda function that is triggered by a s3 file upload (bucket and key of the file are parameters). Whats the most lightweight way to do this? Any example code would be great.
    k
    m
    23 replies · 3 participants
  • p

    Pawel

    06/14/2021, 7:17 PM
    Hi folks, I have a task decorated with GCSResult
    from prefect.engine.results import GCSResult
    from prefect import task, Flow
    
    @task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
    def compute_benchmark() -> float:
        return 0.753
    
    with Flow("test_gcs") as flow:
        r = compute_benchmark()
    
    x=flow.run()
    When running in Prefect Cloud, the result is written to the specified GCS location and logs show as much, but when running the same flow on my local machine, the flow runs successfully but no attempt is made to write to GCS. It's not an access issue as I can manually call GCSResult.write(...) with success. Is the GCSResult decorator only expected to work from Prefect Cloud runs or am I missing some setting?
    k
    8 replies · 2 participants
  • a

    ash

    06/14/2021, 9:56 PM
    Hi everyone, I deployed prefect on a single node and now aiming at deploying the same on kubernetes using helm chart. While learning from live sessions on youtube I came across this piece of code.
    @task
    	def say_hello():
    		print("hello world !!!")
    
    with Flow("hello world") as flow:
    	say_hello()
    
    
    flow.storage = Docker(registry_url = "", image name = "hello world flow")
    flow.register("demo")
    Can someone please explain to me 1.) Why we used container registry for storing a flow and what difference would it make compared to other script based storage like github. 2.) When we execute a flow whose server is present on K8 but no executer is defined like in above snippet, where would the flow be executed on local machine or on a pod on the cluster.
    m
    k
    24 replies · 3 participants
  • m

    Mehdi Nazari

    06/14/2021, 10:47 PM
    Hi Everyone, I was wondering if there is a how-to guide to bring up a prefect-server instance on docker platform I can read on? No strict limitation on how many containers are needed at this point. I’m more interested in an approach that is maintainable.
    k
    j
    29 replies · 3 participants
  • e

    Emma Rizzi

    06/15/2021, 8:04 AM
    Hello, I have an issue with the storage, as it's getting hard to configure the TLS config with DockerStorage as our registry is not properly administrated yet (another part of my team is handling this) a few questions: • Is there a way to use Docker storage without pushing the image to a repository ? • Can't we authentify to docker repository with login/password instead of TLS Config ? • Is there a way to specifiy the files to upload when using S3/GCS ? I first tried theses options but I got many file not found errors due to only the flow file being uploaded. I switch to docker storage because I saw it had the 'files' option.
    k
    16 replies · 2 participants
  • t

    Tom Forbes

    06/15/2021, 1:11 PM
    How can I customise the flow executor depending on if the flow is being run locally or being serialized to a docker container? i.e I want to do:
    if running_locally:
        flow.executor = LocalDaskExecutor()
    else:
        flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(pod_template=....))
    obviously the second DaskExecutor won’t work locally, which is a shame. I can’t find any supported way of checking if we’re serializing this for production rather than debugging it locally
    k
    m
    10 replies · 3 participants
  • t

    Tom Forbes

    06/15/2021, 2:54 PM
    Is there a way to get the currently defined
    Result
    class/instance for a given task within the task itself?
    k
    6 replies · 2 participants
  • t

    Tom Forbes

    06/15/2021, 5:16 PM
    When using
    flow.run()
    is it possible to override the result class/type? Doing:
    flow.run(result=LocalResult(...))
    fails with:
    TypeError: run() got an unexpected keyword argument 'result'
    k
    5 replies · 2 participants
  • r

    Raúl Mansilla

    06/16/2021, 4:32 PM
    Hello all, I´m trying to get a flow running using a docker agent that it´s already running in docker…I get the agent connected to prefect server and the flow gets to the agent when I run it, but It never finish…
    m
    31 replies · 2 participants
  • p

    Paulo Benatto

    06/17/2021, 8:59 AM
    Hi guys, how r u doing? I'm new to prefect and i was reading: https://docs.prefect.io/core/idioms/parallel.html and i think im missing something 😏. I would like to build a list of tasks to be executed in parallel. See this dummy example:
    @task
    def create_task_list(ids):
        l = []
        for i in ids:
            l.append(ReferralUID())
        return l
    
    @task
    def generate_ids():
        return ["id1", "id2", "id3"]
    
    
    
    with Flow("parallel-execution") as flow:
    
        ids = generate_ids()
        
        list_of_tasks = create_task_list(ids)
    
        request_referral = RequestReferralDetail()
    
        # list_of_tasks should be a list, but i'm returning a task. how to inject a list on bind?
        request_referral.bind(request_details=list_of_tasks, flow=flow)
    
        flow.visualize()
        flow.run(executor=LocalDaskExecutor())
    Thanks
    r
    7 replies · 2 participants
  • c

    Chris Bowen

    06/17/2021, 4:22 PM
    Hello, I'm getting a Prefect server set up on a RHEL server for the first time. I have a question related to the local agent- is there any way to run the agent "detached" like the server command? With the server, I can run
    prefect server start --detach
    so it doesn't run interactively. Is there a comparable solution for agents? When my ssh session with the server ends, so does my agent. I might just be missing something obvious. Appreciate any help.
    k
    2 replies · 2 participants
  • y

    YD

    06/17/2021, 7:25 PM
    Try to set rootless server on CentOs 7, following https://docs.docker.com/engine/security/rootless/ not so clear to me how to install
    newuidmap
     and 
    newgidmap
    also not so clear what exactly I should add to
    /etc/subuid
     and 
    /etc/subgid
    if someone can help with a step by step example, it will be very helpful thanks
    k
    d
    5 replies · 3 participants
  • y

    YD

    06/17/2021, 7:42 PM
    another question on installation following https://docs.prefect.io/core/getting_started/installation.html If I want to change the port from 8080 to some other port, how to do this? this is for a case that something else is installed on this server and uses the 8080 port
    k
    2 replies · 2 participants
  • s

    Shaoyi Zhang

    06/17/2021, 8:07 PM
    Hi everyone, what’s the best way to report a potential security bug with Prefect?
    k
    1 reply · 2 participants
  • y

    YD

    06/17/2021, 8:54 PM
    How to keep the UI running on the server? If I run
    prefect server start
    , it will stop when exit the terminal I tried
    sudo systemctl prefect server start
    , but this does not look like to proper way
    k
    1 reply · 2 participants
  • y

    YD

    06/17/2021, 9:28 PM
    General question... I had some sample workflows running on my local machine (on my laptop), http://localhost:8080/default I've installed Prefect server on a VM. when I go to
    <VM ip address>:8080
    , without running anything, it is somehow shows the workflows and projects I had on my local machine. How can this be ?
    k
    6 replies · 2 participants
  • a

    ash

    06/18/2021, 12:08 PM
    Hello everyone , I am a little confused on the below mentioned part
    import pymongo
    import pandas
    from reports.config import mongo_config
    import sklearn
    from prefect import Flow
    from prefect.storage import Docker
    
    @task
    	def say_hello():
    		print("hello world !!!")
    
    with Flow("hello world") as flow:
    	say_hello()
    
    
    flow.storage = Docker(registry_url = , image name = "hello world flow")
    flow.register("demo")
    In above code I am importing three external libraries i.e pandas, sklearn, pymongo and
    mongo_config
    which here contains configuration related information for connecting with mongo
    When I register a flow, lets say for code above ,
    
    A.) step(1) A docker image containing everything including external libraries, mongo config and flow code will be built and saved to container registry.
    	step(2) Its Metadata including a schedule if any , its path to dependencies from container registry etc will be saved on postgres.
    	step(3) When the kubernetes agent polls and have to run the above flow, it will create a pod and dependencies will be installed and after task completion pod is terminated.
    
    	Thats my understanding of how things are working, please correct me if am wrong on any of above.
    	Now one thing here is what if mongo config changes, whenever we built pipelines for reporting all we want to do is just change config at one place and  changes   are incorporated for every other report but going on with above approach , i might need to re-register every flow to let it engulf the updated config, Thats what i thinking over here, can you suggest someways of how can i change config at one place so that all the flows knows it and i don't have to re-register all my flows.
    
    B.) One way that i think will be able to solve this is when I use github as storage as the code is read from github so the updated config will also be taken into consideration possibly but there is one issue in this approach ,
    when the pod is created to run the script how the dependencies will be installed on the pod  since we don't have docker image this time?
    m
    k
    22 replies · 3 participants
  • k

    Kamil Gorszczyk

    06/20/2021, 3:23 PM
    Hi everyone! For a few days now, I’m having some strange fails and raised exceptions which haven’t happened before. I’m using the ShellTask to run a Java Application on our Kubernetes Cluster. The ShellTask has a custom StateHandler to send notifications in case the Shell fails and/or retries. It also uses a lambda function to generate the task_run_name from the executed command:
    executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
        stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
    def taskStateHandler(obj: Task, old_state: State, new_state: State):
        try:
            if new_state.is_retrying():
                sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
            return new_state
        except:
            sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
            return new_state
    Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
    Exception raised while calling state handlers: KeyError('command')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
        new_state = super().call_runner_target_handlers(
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
        new_state = handler(self.task, old_state, new_state) or new_state
      File "prefect-master.py", line 200, in taskStateHandler
      File "prefect-master.py", line 222, in <lambda>
    KeyError: 'command'
    I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.
    k
    z
    2 replies · 3 participants
  • r

    Robert Hales

    06/21/2021, 1:54 PM
    Hi there, is there anyway to control how flows get versioned. We use calver for our releases and having the flow versions align with our deploy versions would be super helpful.
    k
    2 replies · 2 participants
  • c

    CM

    06/21/2021, 3:57 PM
    Hi everyone, I am trying to figure out how to work with a mapped flow with some of the tasks having multiple return values and running into some issues (details in the thread).
    k
    13 replies · 2 participants
  • m

    Matheus Cruz

    06/21/2021, 8:28 PM
    Hi everyone! I'm trying to run a flow on an ECS Agent with GitHub Storage and LocalExecutor and I'm getting this error.
    State Message:
    Failed to load and execute Flow's environment: UnknownObjectException(404, {'message': 'Not Found', 'documentation_url': '<https://docs.github.com/rest/reference/repos#get-repository-content>'} , {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Mon, 21 Jun 2021 20:19:24 GMT', 'content-type': 'application/json;charset=utf-8', 'transfer -encoding': 'chunked', 'x-oauth-scopes': 'admin:enterprise, admin:gpg_key, admin:org, admin:org_hook, admin:public_key, admin:repo_hook, delete:packages, delete_repo, gist, notifications , repo, user, workflow, write:discussion, write:packages', 'x-accepted-oauth-scopes': '', 'x-github-media-type': 'github.v3; format=json', ' x-ratelimit-limit': '5000', 'x-ratelimit-remaining': '4988', 'x-ratelimit-reset': '1624309007', 'x-ratelimit-used': '12', 'x- ratelimit-resource': 'core', 'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X- RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X -Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, Deprecation, Sunset', 'access-control-allow-origin': '*', 'strict-transport-security': 'max -age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0',.....
    I've tried a lot of things and nothing seems to work, do you know what it can be?
    k
    22 replies · 2 participants
  • a

    Aiden Price

    06/22/2021, 2:48 AM
    Hi folks, I'm running Prefect Server in my Kubernetes cluster with a Kubernetes agent. I'm also running a permanent Dask cluster in Kubernetes which I use for running parallelised flows. But each job passes through the Kubernetes Agent on its way to the Dask cluster necessitating the creation of a Kubernetes Job object before the Dask cluster work is scheduled. How should I get around this? Can I run a local agent on the GraphQL server? Thanks all!
    n
    d
    9 replies · 3 participants
  • r

    Roey Brecher

    06/22/2021, 3:42 PM
    apollo:core-0.14.16 docker memory error: we are occasionally seeing failures with the docker container, the container will sometimes restart itself successfully and sometimes it won’t. the hosting machine have enough memory. Is there any setting we can change in terms of how much memory this container is allocated? I couldn’t find it in the server docker-compose file. the error message:
    k
    10 replies · 2 participants
  • d

    Devin McCabe

    06/22/2021, 3:59 PM
    Anyone know how to kill a Cloud flow run that just won't die? I exhausted my free plan quota in the middle of a large mapped task and started getting thousands of "usage exceeded" messages. We'll obviously upgrade our plan, but that flow run won't seem to die. I've tried canceling it and manually setting its state but the "duration" continues to advance. (run ID is
    42036d0b-6443-4648-8c93-50c2ac6a360c
    )
    n
    8 replies · 2 participants
Powered by Linen
Title
d

Devin McCabe

06/22/2021, 3:59 PM
Anyone know how to kill a Cloud flow run that just won't die? I exhausted my free plan quota in the middle of a large mapped task and started getting thousands of "usage exceeded" messages. We'll obviously upgrade our plan, but that flow run won't seem to die. I've tried canceling it and manually setting its state but the "duration" continues to advance. (run ID is
42036d0b-6443-4648-8c93-50c2ac6a360c
)
n

nicholas

06/22/2021, 4:20 PM
Hi @Devin McCabe - if Prefect isn't able to cancel your run the easiest way to stop it would be to tear down the job/process that's running your flow, which will depend on your setup
d

Devin McCabe

06/22/2021, 4:25 PM
Yeah, it was ECS+Dask and everything is already stopped.
I can try stopping the agent, too
n

nicholas

06/22/2021, 4:27 PM
Sorry just to check, after tearing down the container your flow is still trying to set states? 🤔
If it's just the duration still climbing in the UI that's fine, the duration will cap when the run fails its heartbeat check or enters a finished state; it's a calculated duration that is based on the run
start_time
and
end_time
- if the run is still in a
Cancelling
state, it hasn't finished from what Cloud can tell, even if nothing is running. You can manually set this state to
Finished
or
Failed
to cap this
d

Devin McCabe

06/22/2021, 4:29 PM
No, it's effectively not doing anything anymore (log has stopped, all tasks failed or canceled). The only indication it's not fully stopped is that the duration isn't stopping like seen here:
Ok, got it. Won't worry about it then
n

nicholas

06/22/2021, 4:30 PM
Ok let me know if you're still having trouble with it and we can try to triage further.
View count: 2