https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    John Muehlhausen

    09/27/2021, 6:27 PM
    Sorry about the noob question: If an agent dies/stops being responsive in the middle of running a task (not the same as a confirmed failure) will the scheduler schedule it to run again? And, is this behavior configurable?
    k
    a
    • 3
    • 12
  • d

    Daniel Saxton

    09/27/2021, 6:35 PM
    newbie / vague question: what is the recommended way for running multiple independent Prefect flows? just looking for general ideas / best practices to incrementally go from a single flow defined in a Python script to arbitrarily many with no dependencies between them and separate schedules
    k
    • 2
    • 15
  • d

    Daniel Saxton

    09/27/2021, 6:38 PM
    currently i'm using a very basic setup of launching a container and using
    prefect run
    to kick off a single script
  • j

    Jeremy Phelps

    09/27/2021, 6:45 PM
    Hello everyone, I was wondering if it's necessary to assign the return value of a task to a variable within a Flow's with-block. I have a flow like this:
    with Flow('my-flow') as f:
      things = get_things()
      process_thing.map(things)
    ...where each
    process_thing
    returns a value that we don't really need. When Prefect runs this flow, the second task shows that all the individual elements of
    things
    have been mapped, yet the flow run remains in the "Running" state. I'm using Prefect 0.14.22.
    k
    z
    • 3
    • 99
  • j

    Jeremy Phelps

    09/27/2021, 8:00 PM
    Hello everyone, Is there a way to get the Prefect Web console to be a little less animated? Every time I go to the dashboard, my browser consumes large amounts of CPU and X11 bandwidth on cute animations, and it often hangs the browser entirely.
    j
    • 2
    • 1
  • h

    Hugo Slepicka

    09/27/2021, 11:09 PM
    Hi, I have a very newbie question related to objects returned by a task when composing a Flow. I have a couple of procedures that I want to execute and they return objects. I would like to pass to the subsequent procedure a parameter based on a property which is a member of the object returned by the first procedure. Something like the code below:
    import random
    from prefect import task, Flow
    
    
    class Generator:
        def __init__(self, data):
            self.particle = data
    
    
    @task
    def run_generator():
        return Generator(random.random())
    
    
    @task
    def compute(particle):
        print(particle**2)
    
    
    with Flow('flow1') as flow:
        gen = run_generator()
        compute(gen.particle)
    When I try to create the flow I get the following error:
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    /var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
          1 with Flow('flow1') as flow:
          2     gen = run_generator()
    ----> 3     val = compute(gen.particle)
    
    AttributeError: 'FunctionTask' object has no attribute 'particle'
    Is there any chance I can have the delayed execution of the parameter until the flow is executed? I know that changing
    compute
    to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.
    k
    • 2
    • 6
  • b

    Ben Muller

    09/28/2021, 1:08 AM
    Hello community, I am trying to map a task and hold one of the arguments as a constant, I have read the docs
    Map a function that adds tasks to a flow elementwise across one or more tasks. Arguments that should not be mapped over should be wrapped with prefect.unmapped.
    but I feel like an example is needed trying to do something like this:
    users = ["ben", "kevin"]
    CONSTANT_DATA = "something"
    
    with Flow():
       apply_map(notify, CONSTANT_DATA)
    How do I hold this constant?
    h
    k
    • 3
    • 3
  • r

    Ryan Sattler

    09/28/2021, 5:05 AM
    For a KubernetesRun flow, is there a way to set the docker image to use as a parameter to the flow rather than hardcoding it? This might help with some chicken-and-egg type problems we’ve been having. (Basically we have a flow with dependencies complex enough to require a custom docker image, but that makes it painful for users to iterate on the flow as the image needs to be rebuilt each time - plus different users can effectively clobber each others’ images if the flow is set to use
    latest
    )
    c
    s
    • 3
    • 12
  • j

    jake lee

    09/28/2021, 6:04 AM
    Hi for a localRun is there way to specify which virtualenv to run the task on? I’m using pyenv but the task is not running on the local pyenv environment but rather on the system default python
    k
    • 2
    • 13
  • s

    Son Mai

    09/28/2021, 8:34 AM
    I want to edit email on web hook to another email. How can i do it for all flow instead edit every flow ?
    k
    • 2
    • 1
  • s

    Sridhar

    09/28/2021, 9:33 AM
    Hi folks, I used (https://www.lejimmy.com/distributed-data-pipelines-with-aws-ecs-fargate-and-prefect-cloud/) this tutorial to get started with prefect. I've followed every step exactly as mentioned but when I start the flow on prefect cloud it is stuck at Submitted for execution: Task arn:aws:ecs:ap-southeast-2:037961805145:task/prefect-fargate-cluster/a4bced3ed09f43ef92177c492cdc0cf0 and fails after 3 attempts. here's the agent code:
    from prefect.agent.ecs.agent import ECSAgent
    AGENT = ECSAgent(cluster="prefect-fargate-cluster", labels=['ecs', 'dev'])
    AGENT.start()
    flow.py code:
    import prefect
    from prefect.storage import Docker
    from prefect.run_configs import ECSRun
    from prefect import task, Flow, Parameter
    STORAGE = Docker(registry_url='<http://037961805145.dkr.ecr.ap-southeast-2.amazonaws.com/|037961805145.dkr.ecr.ap-southeast-2.amazonaws.com/>',
                       
    image_name='prefect-etl-flow',
                       
    image_tag='latest',
                     
    dockerfile='Dockerfile.txt')
     
    RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'prefect-fargate-cluster'},
                        
    execution_role_arn='arn:aws:iam::037961805145:role/prefect-ecs',
                        
    labels=['ecs', 'dev'])
    @task
    def testfunc():
        
    print("Hello prefect!")
    with Flow('prefect-etl-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
        
    testfunc()
    flow.register('emmi-etl')
    Really appreciate the help. Thanks in advance.
    k
    • 2
    • 4
  • h

    Haseeb Ahmad

    09/28/2021, 11:33 AM
    Hi All, I have a prefect job scheduled everyday, and it has a default_date parameter.
    with Flow("impact partners",schedule=schedule) as flow:
        
        #setting default date to today - 1 to get the records from previous day
        default_date = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
    
        start_date = Parameter('start_date', default=default_date)
        end_date = Parameter('end_date', default=default_date)
    
        records = get_impact_records(start_date,end_date)
        impact_partners = rename_columns(records)
        upload(data=impact_partners)
        
    if __name__ == "__main__":
        main(flow)
    i think the default_date section gets defined at compile time rather than run time, So, that is why it is static. And the job get run for 2021-09-23 everyday. What is the best way to make default_date dynamic. Thanks
    k
    • 2
    • 2
  • m

    Matthew Seligson

    09/28/2021, 12:24 PM
    What are the best practices around creating “filewatcher” or “polling” type tasks? I believe Airflow has something like this called “file_sensor”.
    e
    k
    • 3
    • 4
  • a

    Alejandro A

    09/28/2021, 12:24 PM
    Hey all, I have a question regarding prefect in general after reading the docs
  • a

    Alejandro A

    09/28/2021, 12:26 PM
    I am not sure if the UI is exclusively something that needs to be used by creating a Cloud Account or it can be run locally and connected to the Prefect Core from the code hosted in the /PrefectHQ/ui repo
    a
    k
    • 3
    • 9
  • s

    Sanil Khurana

    09/28/2021, 1:08 PM
    Hi guys, I am a SDE and my organization is trying to use Prefect along with ECS to run tasks more effectively. I am facing an error for quite some time now and trying to debug it, was wondering if someone can point me to something that I may be missing. I am trying to run a Prefect agent on an EC2 instance, which polls the prefect server running on the same EC2 instance. I am using a simple Python script to send tasks to the server from my local machine, and the agent should pick up these tasks, send it to an ECS cluster where Fargate instances should pick them up and start running them. Most of the flow runs fine except the Fargate instance is not able to pick Flow metadata from the S3 storage. The error I get is
    Failed to load and execute Flow's environment: UnpicklingError("invalid load key, '{'.")
    I have configured the roles to have complete S3, ECS access. I have checked the file on S3 as well and it gets uploaded fine,
    {"flow": "gASVPgk....aFtdlHViLg==", "versions": {"cloudpickle": "2.0.0", "prefect": "0.15.6", "python": "3.7.10"}}
    . I think the only place it is messing up is that the Fargate instance is not able to pick up the task properly. Any idea what I may be missing? Really appreciate the help. Thanks in advance.
    k
    • 2
    • 5
  • a

    Alejandro A

    09/28/2021, 1:48 PM
    Is there anything in the docs on how to transition from a "local" flow where I am reading a file locally that is in the same path as the python file containing the flow to a using the orchestrator and local agents?
    k
    • 2
    • 1
  • k

    Kevin Weiler

    09/28/2021, 1:53 PM
    Hi there - I have a flow with a few root nodes (nodes with no dependencies) and noticed that they didn’t all kick off when the flow started - a few of them waited several hours to start. It looks suspiciously like they waited for other root nodes to complete even though they don’t depend on them. I’m using the
    LocalDaskExecutor
    with
    threads
    . Is there a max parallelization parameter of some kind that I’m unaware of?
    k
    • 2
    • 5
  • s

    Samuel Hinton

    09/28/2021, 4:12 PM
    Hey all! Just wondering if anyone more familiar with the client or API knows how to check and request how many flows have failed within the last X hours?
    k
    • 2
    • 10
  • c

    Constantino Schillebeeckx

    09/28/2021, 5:54 PM
    I've got a task that I'm calling from another task like:
    mount_glue_catalog.run(schema_name=schema_name, glue_db=glue_db_name, upstream_tasks=[create_catalog])
    where
    create_catalog
    is the return from another task. When I execute this I'm getting
    TypeError: mount_glue_catalog() got an unexpected keyword argument 'upstream_tasks'
    When I call this from a flow context, the invocation of
    mount_glue_catalog
    doesn't have any issues. Does the interface to running this task somehow change based on my calling it with run() vs from within a flow context?
    k
    • 2
    • 2
  • j

    Jeremy Phelps

    09/28/2021, 6:28 PM
    Hi again, I was wondering if there's anything special about the way intermediate values from mapped tasks are stored. My tasks are all defined with
    task(result=OurGCSResult(bucket='our-bucket'))
    , where
    OurGCSResult
    is a copied-and-modified version of the
    GCSResult
    class found in Prefect. The difference is that
    OurGCSResult
    is compatible with the old version of the Google Cloud Storage library that our code uses. We would not be able to use the standard
    GCSResult
    without first rewriting a significant portion of our code. This class works fine for tasks that are not mapped, but something goes wrong for mapped tasks. I dug around using the GraphQL client and noticed that all task runs with a non-negative
    map_index
    seem to fail to have any storage information:
    {
                  "id": "eba36675-c15a-43ea-ad5b-2540936477b5",
                  "map_index": 0,
                  "name": null,
                  "serialized_state": {
                    "type": "Success",
                    "_result": {
                      "type": "Result",
                      "location": null,  // WAT?!
                      "__version__": "0.14.22+9.g61192a3ee"
                    },
                    "context": {
                      "tags": []
                    },
                    "message": "Task run succeeded.",
                    "__version__": "0.14.22+9.g61192a3ee",
                    "cached_inputs": {}
                  }
                }
    As far as I can tell from the logs, no errors are happening, and the only possible result of running the task in question is a possibly-empty array being returned, or an exception being thrown (which would be logged).
    k
    • 2
    • 19
  • t

    Tony Yun

    09/28/2021, 7:06 PM
    Hi! I see a very weird issue, where I can fetch the Secret locally fine, but when I register the flow, it failed for the Secret not exist in local error:
    ValueError: Local Secret "CRITEO_SECRET_KEEPS" was not found.
    But running locally is totally fine:
    >>> prefect.client.Secret('CRITEO_SECRET_KEEPS').get()
    '1stxxxxg'
    Is there any possible reasons? I clearly defined the secret in local config file.
    k
    • 2
    • 6
  • a

    Abhas P

    09/29/2021, 12:46 AM
    Hi, I am trying to test the flow compositions as pointed in this article, I declare my tasks as a decorator in the flow file(instead of a Class kinda definition as pointed in the link) like :
    #flows/smaple_flow.py
    
    @task
    def load():
    I try to test the flow composition like this :
    from flows.sample_flow import load
    from flows import sample_flow as flow_file
    
    def test_flow_composition():
        
        load_task = load
        flow_tasks = flow_file.flow.tasks
    
        assert flow_file.flow.terminal_tasks() == set([load_task])
    
        # assert load_task in flow_tasks    - same things happens here , load_task is clearlya n element of the flow_tasks set but it asserts false
    I get this error, which is little weird (I suspect object signatures here :
    >       assert flow_file.flow.terminal_tasks() == set([load_task])
    E       assert {<Task: load>} == {<Task: load>}
    E         Extra items in the left set:
    E         <Task: load>
    E         Extra items in the right set:
    E         <Task: load>
    E         Full diff:
    E           {<Task: load>}
    Can you help me with this ?
    k
    • 2
    • 3
  • a

    Antti Tupamäki

    09/29/2021, 7:27 AM
    Hi all, noticed that following did not work because it creates capitalised keys and adds some additional keywords like networkmode
    client = docker.from_env()
    host_config = client.api.create_host_config(binds=[
        '/psql/data/:/data'])
    print(host_config)
    
    container_id = CreateContainer(command="cat /data/test",
                          host_config=host_config)
    ended up doing
    container_id = CreateContainer(command="cat /data/test", host_config={'binds': ['/psql/data/:/data']}
    which works I think that part of documentation needs to update.
    👍 1
    :upvote: 1
    k
    m
    • 3
    • 4
  • b

    Ben Collier

    09/29/2021, 7:51 AM
    Hi all, can someone tell me what the default value is of max_retries if it’s not set on a task?
    n
    • 2
    • 1
  • v

    Varun Joshi

    09/29/2021, 8:34 AM
    Hi, is there a way to turn off all the Prefect flows at once using CLI?
    k
    • 2
    • 2
  • h

    Hugo Polloli

    09/29/2021, 8:40 AM
    Hi, I can't seem to find anything about that so I'm asking here, is it possible to set a number of threads per task instead of globally ? I have a task that could be heavily parallelized with no memory constraint and could easily allocate 16 threads to it, on the other hand I have a heavily memory intensive downstream task that just fills memory up at 2 or 3 threads, do I need 2 flows for that ? Thanks and sorry if this has already been addressed somewhere
    r
    • 2
    • 2
  • b

    Bruno Murino

    09/29/2021, 9:47 AM
    Hi everyone — We’ve been running a local prefect server for a while now and I’m assessing whether we should move to prefect cloud or not — however I want to know how many tasks have run successfully, so that I can get an idea of cost. Is there any way to find that out? maybe an api query?
    k
    d
    m
    • 4
    • 30
  • c

    Chris L.

    09/29/2021, 9:48 AM
    Hello Prefect community, just quick question regarding the process where the flow source is run through after downloading it from source (but before executing the tasks). My set up is as follows: 1. KubernetesAgent 2. KubernetesRun with a custom
    job_template.yaml
    Is the flow download process: 1. Run on the Kubernetes job spun up by the Agent (and hence takes the environment variables as specified in
    job_template.yaml
    ); OR 2. Run on the agent, which then passes the flow metadata to the spun up job?
    g
    • 2
    • 7
  • w

    Will

    09/29/2021, 10:57 AM
    Hi, I've got a question about running flows using the ECS Agent. I've read through the documentation and can't seem to find a way to run different tasks within a single flow using different docker images / task definitions (or with different installed dependencies). Is this not possible? My use case would be eg. a basic ETL flow where you pull from multiple services (using different installed libraries), combine the data, and write to a data warehouse or similar. Is the only option to include all required dependencies for all tasks in the flow?
    k
    • 2
    • 3
Powered by Linen
Title
w

Will

09/29/2021, 10:57 AM
Hi, I've got a question about running flows using the ECS Agent. I've read through the documentation and can't seem to find a way to run different tasks within a single flow using different docker images / task definitions (or with different installed dependencies). Is this not possible? My use case would be eg. a basic ETL flow where you pull from multiple services (using different installed libraries), combine the data, and write to a data warehouse or similar. Is the only option to include all required dependencies for all tasks in the flow?
k

Kevin Kho

09/29/2021, 2:37 PM
Hey @Will, I don’t think this is possible unless you use the
CreateContainer
,
StartContainer
family of tasks in the task library. In short, you would need to start containers yourself, but I think the limitation here is getting the data in and out of these various containers. They are isolated from each other so if you have a DataFrame in one container, I don’t think another container will readily be able to use it (they are like different machines). I would imagine packaging all dependencies would be easier, but you can do something like StartContainer1 -> Persist Results1 -> Start Container2 -> Persist Results2 -> Start Container3 to Load and Combine Results1 and Results2. You can then split this up to subflows orchestrated by a main flow. Just persist in a location like S3 for other flows to grab.
w

Will

09/29/2021, 2:40 PM
Ok great thanks Kevin. One question for my own understanding then - between prefect tasks within a workflow, are ECS tasks reused? eg. if I initated a map step, how would that work; I'm guessing multiple tasks would be started? Or would Prefect attempt to share the work across a smaller number of tasks / a single task? (I probably need to go and read the code for the ECS agent!)
k

Kevin Kho

09/29/2021, 2:43 PM
The ECS agent makes one “ECS Task” per Flow Run. The “Prefect Mapped Task” is a subset of the “ECS Task”. The word “task” is overloaded here lol
View count: 2