https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Daniel Caldeweyher

    03/03/2021, 10:35 PM
    Can some one tell me why my flow is getting picked up by 2 agents?
    j
    16 replies · 2 participants
  • d

    Daniel Caldeweyher

    03/03/2021, 11:26 PM
    Another issue I came across is when you combine nout > 1 and .map: The multiple return values from the nout>1 task are not iterated correctly when passed as args to the next task
    from typing import Tuple
    
    from prefect import Flow, Task, task
    
    
    @task(nout=2)
    def step1(num) -> Tuple[int, str]:
        chars = ['a','b','c']
        return num, chars[num - 1]
    
    @task
    def step2(arg1, arg2):
        print(f"arg1={arg1} arg2={arg2}")
    
    def unzip(nout: int) -> Task:
        @task(nout=nout)
        def _unzip(res) -> tuple:
            return tuple(zip(*res))
        return _unzip
    
    
    with Flow("nout test") as flow:
        numbers = [1, 2, 3]
        # bad
        a, b = step1.map(num=numbers)
        step2.map(arg1=a, arg2=b)
        # prints:
        # arg1=1 arg2=2
        # arg1=a arg2=b
    
        # good - with unzip workaround
        c, d = unzip(2)(step1.map(num=numbers))
        step2.map(arg1=c, arg2=d)
        # prints:
        # arg1=1 arg2=a
        # arg1=2 arg2=b
        # arg1=3 arg2=c
    
    
    if __name__ == "__main__":
        flow.run()
    c
    j
    5 replies · 3 participants
  • s

    Samuel Hinton

    03/04/2021, 2:01 PM
    Hi team! Is there an easy way (preferably in the UI) to cancel all running flows (anything in the blue state)?
    j
    3 replies · 2 participants
  • b

    Billy McMonagle

    03/04/2021, 2:34 PM
    Good morning, I have a question about flow registration. I'd like to register my flows during CI/CD (specifically, AWS Codebuild). My issue is that I have run into import errors, because my flow dependencies are not installed in the build environment. I am using
    S3Storage
    and
    KubernetesRun
    . Thanks in advance for any guidance.
    j
    s
    +2
    48 replies · 5 participants
  • h

    Hawkar Mahmod

    03/04/2021, 3:00 PM
    Hi folks, I've created a flow that uses Docker Storage, and an ECS run config. The run config has a task definition defined locally in a YAML file. I am using Prefect Cloud as my backend. The flow registers ok but when I attempt to run it the task definition cannot be registered because I have not specified a image to the ECS Run Config. However in the API reference it says: • `image (str, optional)`: The image to use for this task. If not provided, will be either inferred from the flow's storage (if using 
    Docker
     storage), or use the default configured on the agent. and I am using Docker Storage, which has worked in the past and does get stored in my image repository (ECR). It seems as though the specfiying an explicit task definition, either through the
    task_definition_path
    or
    task_definition
    arguments and using Docker Storage does not work, or there is a bug in the image being picked up from Storage. Any guidance would be welcome.
    j
    7 replies · 2 participants
  • j

    Jack Sundberg

    03/04/2021, 4:01 PM
    Do Agents submit separate flow runs to the Executor in a depth-first or breadth-first pattern? I’m wondering if Prefect gives priority to task B of the 1st flow run (“1B”) over the task A of the 2nd flow run (“2A”).
    m
    11 replies · 2 participants
  • m

    Marwan Sarieddine

    03/04/2021, 4:28 PM
    Hi folks, what is the current approach for specifying a custom prefect context (to be used as a default) when registering a flow ?
    m
    5 replies · 2 participants
  • m

    Marwan Sarieddine

    03/04/2021, 4:53 PM
    Hi folks, we spotted a change in
    0.14.11
    that made the Flow object no longer pickleable using
    pickle
    - I am wondering if others have bumped into this
    s
    j
    12 replies · 3 participants
  • j

    Jitesh Khandelwal

    03/04/2021, 5:52 PM
    Hello everyone, is there a way to set the state of a task to Skipped, after all retries have failed ?
    m
    2 replies · 2 participants
  • f

    Felipe Saldana

    03/04/2021, 7:40 PM
    Question about using secrets. I have my flow running successfully ... I would like to know if this is the best practice on using secrets: • create them outside the flow • calling the .run() method on them
    aurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True).run()
    aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True).run()
    aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True).run()
    
    with Flow("test_vars") as flow:
    m
    a
    14 replies · 3 participants
  • s

    Samuel Hinton

    03/04/2021, 8:18 PM
    Hi all, looking for some help debugging here. I have a task which finishes fine when run locally, the logs indicate it finished fine when run through an agent, but the task itself is stuck in Running. Its a simple flow (it turns a tiny pandas dataframe into a json object), and I make a call to the logger directly before the return statement. I see my log statement that it all worked, but the task state doesnt change. Is there a good way of debugging what on earth might be going on? The odd thing is that the task it never finishes is a common task (I download 4 things and process them all in the same way). 2 of them finish, every time. 2 of them get stuck on running, without fail. EDIT: I seem to have found a potential bug in Prefect. These hang ups occur when I have
    @task(timeout=20)
    , but they just completed successfully with a normal
    @task
    annotation. Will update prefect now and check to see if that helps
    m
    19 replies · 2 participants
  • s

    Scott Moreland

    03/04/2021, 9:16 PM
    Trying to make a task decorator that subsamples and caches the output of a task to HDFS (each task returns a spark dataframe). The goal is to quickly iterate on and debug downstream tasks using subsampled data. Since these are not full blown checkpoints, I'm not sure if the Results API would be appropriate. I was thinking something like
    @cache(subsample=100, sdf_key='sdf_large')
    @task
    def some_large_spark_dataframe():
      "intensive ETL process here"
      ...
      return sdf
    
    
    @task
    def downstream_task(sdf_large):
      "some intensive computation on sdf_large"
      ...
      return sdf
    
    
    with Flow() as flow:
      sdf_large_sample = read_from_cache('sdf_large')
      downstream_task(sdf_large_sample)
    ...but I've had difficulty stacking decorators with the
    task
    decorator. Moreover, the usual challenges of the task result not being available until evaluation time. Any recipes you'd recommend?
    m
    2 replies · 2 participants
  • d

    Daniel Ahn

    03/04/2021, 10:38 PM
    Hi all, I'm trying to have dependent flows, where the parent flow that needs to run only once. Consider the following scenario: 1. Parent flow is an expansive process that preprocesses data and loads into a storage (let's say S3). Ideally, it shouldn't run more than once. 1. There can be multiple children flows that use the preprocessed data. I've taken a look at this. However, with this setup, in order to ensure for the parent flow to run only once, I need to put all children flows in a single "flow of flows". However, I envision a use case where different teams bring in their own flow and use the common preprocessed data. My understanding is that if each team's flow define the parent flow as dependency using
    upstream_tasks
    , it would cause the parent flow to run multiple times. I think what I need is like the factory pattern in OO paradigm. In Airflow, I think I can use ExternalTaskSensor to achieve this. - Does my question/use case make sense? - Is there a preferred "Prefect" way to solve this? - Or should I keep the state of the parent job externally?
    m
    2 replies · 2 participants
  • t

    Trevor Kramer

    03/05/2021, 1:39 AM
    What happens if the agent dies in the middle of a flow? Will the flow continue when a new agent comes online or will the flow error out?
    c
    m
    6 replies · 3 participants
  • j

    Jack Sundberg

    03/05/2021, 2:22 AM
    Hey everyone, I'm really struggling to wrap a custom Executor into Prefect and could use some help! I know it's a little late in DC, so let me know if this should wait til tomorrow lol. So I have
    future = CustomExecutor(flow.run, *my_args, **my_kwargs)
    working just fine. The issue comes when I try
    flow.executor = CustomExecutor(); state = flow.run()
    . When I do this with, I recieve...
    Unexpected error occured in FlowRunner: AttributeError("'CustomFuture' object has no attribute 'is_mapped'")
    I read through the FlowRunner code and have a good guess as to what's happening, but I could use some insight from a dev. Has anyone seen this before?
    c
    7 replies · 2 participants
  • t

    Thomas Hoeck

    03/05/2021, 10:05 AM
    Hi. I have a flow where I get a token for an api that goes stale after some known time. Is there a way to indicate that a depending task should be rerun. An example of the flow could me as follows:
    from prefect import task, Flow, unmapped
    import requests
    
    def task_that_goes_stale():
        token = code_to_get_token_at_some_cost()
        return token 
    
    def task_using_token(user,token):
        r = requests.get('<https://api.github.com/user/{user}>', headers = {'TOKEN': token})
        
    with Flow("my_flow") as my_flow:
        all_users = [i for i in range(1000)]
        token = task_that_goes_stale()
        task_using_token.map(all_users,unmapped(token))
    I know i could just get a new token for each call but this seems like the wrong way to do it. Is there a Prefect way to do it? I can see that there is output caching but as I read it this is meant for bringing results between flows https://docs.prefect.io/core/concepts/persistence.html#output-caching
    n
    2 replies · 2 participants
  • m

    Moritz Rocholl

    03/05/2021, 12:34 PM
    Hey Everyone, Im currently trying to get a better grasp on how to best interact with past results of flows. Something similar to what metaflow has https://docs.metaflow.org/metaflow/tagging would be great. Is there some preferred way to do this? Something like:
    with Flow("A Flow") as my_flow:
        task_res = my_task()
    task.run()
    followed by:
    latest_flow_state = my_flow.last_successful_run
    latest_flow_state.tasks[task_res]
    n
    5 replies · 2 participants
  • k

    Kieran

    03/05/2021, 2:45 PM
    Hey, I am using Prefect Cloud on version
    0.13.18
    . Has anyone had this before -- all of a sudden a large row of cancelled flows appear in the UI? (these flows run using the CronSchedule)
    n
    34 replies · 2 participants
  • b

    Berty

    03/05/2021, 3:24 PM
    After upgrading to v0.14.11 I can no longer start my local stack. Any guidance on resolving this?
    $ prefect server start
    ERROR: The Compose file './docker-compose.yml' is invalid because:
    services.ui.depends_on contains an invalid type, it should be an array
    services.apollo.depends_on contains an invalid type, it should be an array
    services.towel.depends_on contains an invalid type, it should be an array
    services.graphql.depends_on contains an invalid type, it should be an array
    services.hasura.depends_on contains an invalid type, it should be an array
    Exception caught; killing services (press ctrl-C to force)
    ERROR: The Compose file './docker-compose.yml' is invalid because:
    services.ui.depends_on contains an invalid type, it should be an array
    services.apollo.depends_on contains an invalid type, it should be an array
    services.towel.depends_on contains an invalid type, it should be an array
    services.graphql.depends_on contains an invalid type, it should be an array
    services.hasura.depends_on contains an invalid type, it should be an array
    n
    m
    6 replies · 3 participants
  • k

    Kao Phetchareun

    03/05/2021, 5:48 PM
    Does the EmailTask’s
    email_to_cc
    only allow for sending one email for cc?
    👀 1
    n
    j
    8 replies · 3 participants
  • d

    Daniel Ahn

    03/05/2021, 6:16 PM
    with a local agent and local executor, is there an easy way to keep track of the location of an external file? i'm trying to do spark-submit with the
    ShellTask
    , but it needs the location of the file i'm submitting with. Right now, I've hardcoded the location in my file, and switching to that location with
    helper_script
    option on
    ShellTask
    👀 1
    n
    5 replies · 2 participants
  • a

    Alex Welch

    03/05/2021, 8:20 PM
    when using the
    GITHUB Storage
    if you are referencing an outside file that is in the same repository, is the flow able to find and run it?
    🙌 1
    m
    2 replies · 2 participants
  • c

    CA Lee

    03/06/2021, 9:18 AM
    Hi, I am attempting a KubernetesRun using DigitalOcean’s managed kubernetes service. I am using S3 for flow storage, and a public Docker image to run the flow. Kubernetes agent has been registered, and S3 credentials specified as env vars following the docs :
    prefect agent kubernetes install -t token_value -l label -e aws_access_key_id=XXX -e aws_secret_access_key=YYY --rbac | kubectl apply -f -
    The agent is installed and running in the kube cluster on inspection using kubectl get pods
    NAME                             READY   STATUS    RESTARTS   AGE
    prefect-agent-58cf5b46d5-84hh6   1/1     Running   0          26m
    Running the flow in Prefect server, and the flow gets picked up by the agent, but encountering the error:
    Failed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
    This is despite the agent having knowledge of the S3 creds via the -e flags. I have also tried passing the S3 creds as a dict to KubernetesRun inside my flow:
    KubernetesRun(
      image="my_image",
      labels=["my_labels"],
      env={
        "aws_access_key_id": "xxx",
        "aws_secret_access_key": "yyy"
      }
    )
    Am I missing something for my kube cluster to authenticate to S3?
    j
    5 replies · 2 participants
  • c

    CA Lee

    03/06/2021, 10:14 AM
    Does anyone know how to debug a Kubernetes run with the error:
    Pod prefect-job-7b9f3bf6-z4p27 failed.
    	Container 'flow' state: terminated
    		Exit Code:: 1
    		Reason: Error
    3 replies · 1 participant
  • a

    Alex Welch

    03/07/2021, 6:40 AM
    Hi all, I am really struggling getting this to work. I am trying to use
    Docker Storage
    with a
    ECSRun Config
    . What I am looking to make happen is to have the github repo cloned to the docker container so that my flow has access to various files (jupyter notebooks primarily). I have been trying solutions for a number of days and I am currently stuck on
    Error while fetching server API version: {0}'.format(e)
    docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or direct
    This would indicate that my prefect image does not have access to the docker deamon. But I can’t figure out what I am doing wrong. I have
    prefect backend cloud
    set. And below are my files.
    c
    33 replies · 2 participants
  • j

    Jay Vercellone

    03/07/2021, 11:40 PM
    Hello! I am setting up my Prefect flows CI/CD pipeline, and as part of that I need to register flows against my Prefect server instance. One thing I noticed is that there's a
    set_schedule_active
    parameter that can be specified to the
    .register()
    call in order to automatically enable or disable the scheduling of the new flow version. These values can be either
    True
    (enable the scheduling) or
    False
    (disable the scheduling), but there's no option to "leave it as it is right now", meaning that if the flow is active/inactive, the new registration should leave the new workflow as active/inactive as well. What's the best approach in this scenario? I want to avoid 2 potentially dangerous situations: - Accidentally enabling a flow that should remain disabled - Accidentally disabling a flow that should remain enabled Avoiding any specification in the code would be ideal, since we don't want to enable or disable flows using the code, but rather the UI/API. Thanks!
    c
    5 replies · 2 participants
  • j

    John

    03/08/2021, 1:25 AM
    hey folks—i’m new to Prefect and am trying to understand how one might approach configuring a flow so that it has access to its own previous execution date (similar to airflow’s
    prev_execution_date_success
    variable). in reading the docs, caching doesn’t quite seem like the right fit. persisting output makes sense, but not clear to me the right way to re-consume that output on a future flow run. so at this point my inclination is to simply read/write the last run timestamp to a local file. anyone have a recipe, docs, or otherwise on a better approach?
    a
    j
    3 replies · 3 participants
  • a

    Alfie

    03/08/2021, 8:33 AM
    Hi Team, I’m using Prefect core, and would like to track identity of user who created/updated a flow. Any suggestion about how to do that properly?
    d
    7 replies · 2 participants
  • a

    Alfie

    03/08/2021, 8:33 AM
    Thanks
  • a

    Avi A

    03/08/2021, 10:28 AM
    Hey community, There’s a phenomenon I’ve encountered various times in which a task is being executed twice within one flow run (after completing successfully the first time). It happens for regular tasks and mapped tasks as well. We are suspecting that this is an issue where the downstream task tries to fetch the task’s result, fails to do so, and that triggers the original task to run again, even though it completed successfully. Any ideas why this is happening and what we can do to fix? Working on: • Kubernetes dask cluster (Kubernetes Executor) • Prefect 13.18 cc: @Vladimir Zoubritsky
    v
    d
    9 replies · 3 participants
Powered by Linen
Title
a

Avi A

03/08/2021, 10:28 AM
Hey community, There’s a phenomenon I’ve encountered various times in which a task is being executed twice within one flow run (after completing successfully the first time). It happens for regular tasks and mapped tasks as well. We are suspecting that this is an issue where the downstream task tries to fetch the task’s result, fails to do so, and that triggers the original task to run again, even though it completed successfully. Any ideas why this is happening and what we can do to fix? Working on: • Kubernetes dask cluster (Kubernetes Executor) • Prefect 13.18 cc: @Vladimir Zoubritsky
v

Vladimir Zoubritsky

03/08/2021, 12:02 PM
The issue sounds similar to the issue that was fixed for LocalDaskExecutor https://github.com/PrefectHQ/prefect/pull/3127 Could this be affecting us using
DaskExecutor(cluster_class=KubeCluster)
?
🤔 1
d

Dylan

03/08/2021, 4:30 PM
Hi @Avi A! Could you share an some example logs and states as well as your Run Config and Executor configuration?
Also, we’ve made some big changes since the last 0.13.x release, you may find that upgrading to 0.14.x may solve your issue
a

Avi A

03/09/2021, 8:00 AM
Thanks @Dylan. We might upgrade to 0.14 soon. @Vladimir Zoubritsky is the engineer working with me on this; I hope he can share that config info. Currently we suspect that Dask workers are getting killed (possibly due to exceeding memory limit), and then their output is getting lost. It’s pretty weird because we don’t see task failures so not sure why the workers are getting restarted. Does that sound related?
v

Vladimir Zoubritsky

03/09/2021, 11:30 AM
Looking at the worker logs from the time when the tasks started executing for the second time, the worker was killed. However, the worker was probably not executing any task at the time, so no task appear as failed
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 17 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:    <tcp://10.244.19.5:33061>
distributed.worker - INFO -          Listening to:    <tcp://10.244.19.5:33061>
distributed.worker - INFO -          dashboard at:          10.244.19.5:39427
distributed.worker - INFO - Waiting to connect to:    <tcp://10.244.9.86:34195>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   57.00 GB
distributed.worker - INFO -       Local Directory: /srv/app/dask-worker-space/dask-worker-space/worker-utvrlpcv
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    <tcp://10.244.9.86:34195>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
d

Dylan

03/09/2021, 3:51 PM
Thanks for this info! Could you also share your Run Config and Executor configuration?
If you’re running without an orchestration layer (Prefect Cloud or Prefect Server) Dask worker restarts can cause some tasks to be re-run
You can also try giving the Dask workers / Scheduler more resources to try to prevent restarts
v

Vladimir Zoubritsky

03/11/2021, 9:18 AM
Thanks! We will try increasing the number of resources, since the restarts cause subsequent restarts in a quite deterministic way. We are using Prefect Server for the orchestration layer (using the helm-chart to install on kubernetes) The run config we are using is
run_config=KubernetesRun(image=image)
(using a custom image with all our dependencies) And for the executor
executor = DaskExecutor(
            cluster_class=KubeCluster,
            cluster_kwargs={
                'n_workers': 3,
                'pod_template': make_pod_spec(
                    image=image,
                    cpu_request='2',
                    memory_request='57G',
                    memory_limit='57G',
                )
            },
        )
View count: 1