https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • x

    xyzz

    04/12/2021, 11:38 AM
    is there a way to set the project_name parameter of the StartFlowRun Task automatically to the current project of the flow or None if its run with core flow.run?
    a
    m
    +1
    15 replies · 4 participants
  • j

    Jérémy Trudel

    04/12/2021, 12:52 PM
    Hi! I'm trying to register a project using CLI with the following command:
    prefect register --project "my project" --path myflow.py
    All it returns is:
    Project 'my project' does not exist
    I know it does not exist since I'm trying to register it. Is there something I'm missing?
    👀 1
    x
    1 reply · 2 participants
  • p

    Philip MacMenamin

    04/12/2021, 1:23 PM
    I can't seem to find the docs for how to exit a flow if a specific task fails. Currently I'm raising
    signals.FAIL
    within the task, but for some tasks I just want to exit a flow if they don't work, and not have to mark every other task dependent on that task succeeding. Anything I can look at?
    k
    8 replies · 2 participants
  • c

    ciaran

    04/12/2021, 2:25 PM
    cc. @Kevin Kho I'm trying to lock down our CDK deployment so that our Prefect Agent, Flow Runner, Executor and workers are in security groups. I.E. Agent & Runner in Sec1, Executor and Workers in Sec 2. Sec1 can talk to Sec 2, they can both talk to the outside world, but they do not allow external ingress. My issue I'm having is that Prefect seems to try to connect to the Public IP of the Dask Scheduler task in ECS, rather than the Local (within the VPC) IP of the Dask Scheduler task. Is there a way to get Prefect/Dask Executor to use the Local (AWS Private IP) of the Scheduler? It would really help trying to make our infra a bit more secure.
    m
    c
    12 replies · 3 participants
  • m

    Matthew Blau

    04/12/2021, 2:44 PM
    Hello all, today I had a conceptual question: Currently, I have flows that create and start a docker container from an already built image. This gives the advantage of being able to develop the image without having to rebuild it every time we make minor changes to the code and quickly gives visibility to task failure. However, we are not taking advantage of all that Prefect offers. So my question is, how can I have prefect register tasks to Prefect Server from within a docker container and not have to rebuild the docker image every flow run. I am wanting to leverage more of Prefect features but I am unsure based off of the docs how I could accomplish the task of Executing tasks within a docker image without needing to rebuild it every time the flow is scheduled to run. Any help would or direction would be appreciated. Thank you in advance!
    m
    8 replies · 2 participants
  • g

    Gabe Grand

    04/12/2021, 4:40 PM
    Hi team, I have a question about structuring flow-of-flows. I have a linear ML pipeline with multiple steps, each of which has been written as a separate flow:
    flow_1 (preprocessing) -> flow_2 (training) -> flow_3 (deployment)
    k
    j
    +2
    46 replies · 5 participants
  • c

    Carlos Gutierrez

    04/12/2021, 9:32 PM
    Hi everyone! Sorry to bother you with this, but I'm having a really hard time trying to run my unpickled flow from an S3 storage, and I think I don't fully understand how the pickling/unpickling really works behind the scenes. More details in the thread:
    k
    m
    +1
    32 replies · 4 participants
  • h

    Hugo Shi

    04/12/2021, 11:54 PM
    Occasionally I'll have an agent that stops responding for some reason. After I bounce it, it seems to take a while (like 20 minutes) before it can start picking up flows? Has anyone seen this?
    k
    n
    18 replies · 3 participants
  • s

    S K

    04/13/2021, 4:57 AM
    Hello, I want to schedule a flow all 7 days a week, but on weekend want to reduce the run frequency to every 4 hours once instead of every one hour. This is the pice of code for all 7 days week to run every 1 hr from 6 am to 6 pm which needs modification for weekend schedule.                                                                                                                                                schedule = Schedule(
        # emit an event every hour
        clocks=[
            IntervalClock(
            start_date=pendulum.datetime(2021, 4, 1, tz="America/Los_Angeles"),
            interval=timedelta(hours=1))         
            ],
        
        # only include weekdays
        #filters=[filters.is_weekday],
    
        # only include 6am and 6pm
        filters=[
            filters.between_times(time(6), time(18))
        #    filters.between_times(time(6), time(18))
        ]
    )
    n
    1 reply · 2 participants
  • d

    Domantas

    04/13/2021, 10:55 AM
    Hello guys! Recently I have dealt with a problem running my prefect code in the server UI:
    Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n AttributeError("type object \'Members\' has no attribute \'ids\'")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - prefect: (flow built with \'0.14.15\', currently running with \'0.14.12\')')
    Funny thing, that running locally(flow.run()) this error do not appear and it only occurs when executing code via server UI(will attach a screenshot with the error log). For the context, I'm running Rust code in the python script which seems the main problem when using UI(running locally it works perfectly fine, so code itself seems to be good). Maybe anyone knows how to solve this problem? If there are some relevant information missing or there is already another thread is created related with this problem - let me know!
    x
    k
    13 replies · 3 participants
  • i

    Igor Bondartsov

    04/13/2021, 11:22 AM
    Hello! I am using AWS Fargate. I added to my agent env_vars. How I can import Secrets from agent to flow which will be run in the container?
    z
    k
    11 replies · 3 participants
  • s

    Sven Teresniak

    04/13/2021, 11:45 AM
    I have 2 questions regarding the new
    prefect register
    : 1. What is the purpose of
    prefect build
    ? I think I understand what it does. But when do I want to use
    prefect build
    +
    prefect register --json
    ever? 2. Is there any difference between
    prefect register -p flow.py
    on a flow file and calling
    flow.register(…)
    as part of a
    python flow.py
    call? The latter seems much more dynamic (e.g. do some initialization/behaviour changes in
    __main__
    )
    m
    m
    4 replies · 3 participants
  • r

    Rowen

    04/13/2021, 12:54 PM
    Hi all, I am trying to run a very simple flow. It has succeeded when I run it locally by adding 
    flow.run()
     at the end of my python file. However, when I trigger the flow in the prefect cloud UI, it fails at the 
    extract
     stage, the error being 
    At least one upstream state has an unmappable result
    . Below is the code. I will elaborate more in the threads
    @task
    def transform(x):
        return x + 30
    
    
    
    @task
    def extract():
       return [200, 400, 500]
    
    
    
    with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
        e = extract()
        t = transform.map(e) # fails when i trigger flow in the UI
    z
    16 replies · 2 participants
  • s

    Stéphan Taljaard

    04/13/2021, 2:15 PM
    Hi. How would you recommend limiting concurrency of a mapped task (900+ DB queries) using Prefect Server?
    k
    8 replies · 2 participants
  • j

    Jonas

    04/13/2021, 2:58 PM
    Hello, I am learning Prefect and I am trying to make a flow inside a flow which takes a value/parameter from the above flow. I got a database file with a flow which fetches a dataframe and I want to pass that data to a new flow in another file to work with the data. I came across this doc https://docs.prefect.io/core/idioms/flow-to-flow.html, but I am still not sure how you could share data between flows.
    flow_database = StartFlowRun(flow_name="database", project_name="test", wait=True)
    flow_transform = StartFlowRun(flow_name="transform", project_name="test", wait=True)
    
    with Flow("parent-flow") as flow:
       
        data = flow_database() #get a dataframe from flow
    
        flow_transform(data) # take dataframe and use it another flow
    How should I go about this?
    a
    k
    3 replies · 3 participants
  • c

    Charles Liu

    04/13/2021, 3:52 PM
    Is it possible for the S3Upload feature to return an s3 path to an unpickled file?
    k
    e
    27 replies · 3 participants
  • a

    Adam Lewis

    04/13/2021, 4:11 PM
    Hi everyone, does Prefect have the concept of flow priority if more flows are scheduled to run than there are computational resources to run them all simultaneously? I've been looking in the docs but not able to discover it yet. Thanks!
    k
    1 reply · 2 participants
  • m

    Matthew Blau

    04/13/2021, 6:47 PM
    Hello all, we are using a .env file for secret management with prefect instead of the config.toml and are running into issues with injecting updated .env values at runtime with LocalRun(env=env_dict). The only way that the flow picks up the new .env contents is upon registration of the flow, which is not ideal as we need the .env file to be written to during runtime and the new values read from the same file. Any pointers on how to have Prefect pick up the new values? The flow creates a container and injects env values into it with
    enviornment=env_vars
    in the CreateContainer() function but we need to have it inject updated values at runtime. I have had success with
    flow.run_config = DockerRun(env={f"PREFECT__CONTEXT__SECRETS__{k}": v for k, v in config.context.secrets.items()})
    but we are not utilizing prefect's config.toml for this project as well as the flow not being executed within a docker container
    m
    d
    4 replies · 3 participants
  • r

    Ricardo Portela

    04/13/2021, 7:26 PM
    Hello people! How you doing?. I would like to know There is a way with Prefect to prevent a flow execution if that flow was executed early in that day successfully?. please feel free to share github samples as well. Thanks in advance.
    k
    j
    15 replies · 3 participants
  • w

    Will Milner

    04/13/2021, 8:28 PM
    what's the best way to get a url back to a flow run when running locally? for context I'm trying to include this url inside of an email message I send on flow failure
    k
    12 replies · 2 participants
  • i

    Ismail Cenik

    04/13/2021, 8:31 PM
    Hello everyone, I hope you are well. We have an AWS Kinesis Data Analytics (KDA) application. A task in one of my flows is responsible for starting the KDA via the Boto3 library (start_application). My next task will start when the KDA stops, so I need to know the app status, so there is another API called “describe_application” to see its status. If the app status is “READY”, then I can continue with the next task. What is the best way to design that flow? 1. Using "waiter" to see the status, but I could not find related waiters for KDAs. There are good examples for Amazon Batch. 2. Creating a separate task to call the “describe_application” in a scheduled way. As far as I understand schedules are generally used for flows. Could you please advise me on this?
    k
    e
    10 replies · 3 participants
  • j

    Jacob Blanco

    04/14/2021, 5:42 AM
    Is it possible to create new schedules for existing flows from GraphQL with default parameters? I know about
    set_flow_group_schedule
    . edit: Nevermind, I found it here: https://github.com/PrefectHQ/server/blob/4a9887091ffabe1a8c7098d6603663779d63e8eb/tests/graphql/test_flow_groups.py#L155 Leaving this here for posterity
    ✅ 1
    k
    1 reply · 2 participants
  • h

    Hawkar Mahmod

    04/14/2021, 7:02 AM
    Morning all. I have a bit of confusion around registering my flows in our CI/CD process, and it boils down I think to this simple question - do I need prefect AND all my flow's dependencies installed when I make the call to
    flow.register()
    or
    prefect register
    ? I'm using
    S3
    Storage and
    ECS
    Run Config. I was previously using
    Docker
    Storage but didn't like the rebuilding of the image each time the flow code changed. I am using AWS CodePipeline/CodeBuild, so I have Docker daemon at my disposal, and I have a base image that contains the whole repo the flow sits in. Ideally, in my build environment I can build the base image if necessary, and register my flow with Prefect Cloud. However it seems I can't register unless I have all my dependencies, which happen to sit in the base image anyway. So perhaps I
    docker run
    and register this way? I've read the GitHub discussion on flow deployment patterns but I am not fully clear on this particular question.
    k
    7 replies · 2 participants
  • v

    Varun Joshi

    04/14/2021, 8:33 AM
    Hey Prefecters, I don't want my flows to run between 2 AM to 10 AM from Mondays to Thursdays and 2 AM to 8 AM from Fridays to Sundays. I'm using the filters module provided by prefect for this. Assuming that I can do this,
    from prefect.schedules import IntervalSchedule, filters
    I made the below change in my code
    ist = pendulum.timezone('Asia/Calcutta')
    schedule = Schedule(
        clocks=[IntervalClock(interval=datetime.timedelta(minutes=1))],
        filters = filters.between_times(pendulum.time(6),pendulum.time(10))
    However, this doesn't seem to be working. Could someone please guide me on how to import filters and apply them to my schedule? Thank you, Varun
    k
    11 replies · 2 participants
  • t

    tash lai

    04/14/2021, 10:06 AM
    Morning. I'm looking for a way to manually trigger a Cached state from within a task, is it at all possible?
    k
    m
    +1
    10 replies · 4 participants
  • a

    Aurélien Vallée

    04/14/2021, 11:41 AM
    Hmm, the concept of "building" flows when registering is a bit vague and I cannot find documentation on this. What does it actually mean to provide
    build=True
    ? The doc states:
    if
    True
    , the flow's environment is built prior to serialization; defaults to
    True
    Basically, I'm trying to automate the registration of flow in a CI environment, and I get warnings such as:
    UserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
      serialized_flow = flow.serialize(build=build)
    Well, basically, since I'm automating the registration of flows, I do not really know which ones are new, which ones are identical, and which ones changed.
    k
    3 replies · 2 participants
  • a

    Aurélien Vallée

    04/14/2021, 11:42 AM
    providing
    build=False
    does not trigger the warning, though I don't really understand the impact it has. Should I somehow manage to provide
    build=True
    for the first time a flow is registered and then
    build=False
    during updates?
  • r

    Robin

    04/14/2021, 12:27 PM
    Dear prefect folks, I get an error when trying to schedule one prefect flow from the UI, while another one works fine. 😮 Setting a schedule for the same flow during flow registration also works fine. 🤔
    There was a problem creating your schedule, please try again shortly. Error message: Error: GraphQL error: Invalid clock provided for schedule: {'type': 'CronClock', 'cron': '0 * * * *', 'parameter_defaults': {...}, 'start_date': {'dt': '2021-04-14T14:21:57.622602', 'tz': 'Europe/Amsterdam'}}
    ✅ 1
    j
    n
    6 replies · 3 participants
  • t

    tash lai

    04/14/2021, 1:42 PM
    Hey prefectians I have a yet another question regarding memory usage. Not sure if it's a prefect problem or dask problem or just my ignorance. Here is the most useless flow humanity ever created:
    @task
    def donothing(x):
        pass
    
    with Flow('useless') as flow:
        lst = list(range(4000))
        donothing.map(lst)
    
    
    flow.executor = DaskExecutor('<tcp://localhost:8786>')
    dask-worker --nthreads=50
    Thing is the worker quickly eats up a lot of memory with each mapped task run, up to a gigabyte at the end of the flow, and that memory is not cleared when the flow finishes. The project i'm working on implies running up to ~100000 io-heavy tasks, so seeing this i'm a little worried that prefect might not be a right tool for the job. But maybe it's me doing something wrong?
    k
    j
    +1
    6 replies · 4 participants
  • y

    Yuliy Kuroki

    04/14/2021, 1:54 PM
    Hello everyone! Is there a way to have a prefect local agent wait for all tasks it’s running to finish before starting to work on a new task? It’s running a very slow and intensive process and I want it to finish before starting work on a new one.
    k
    4 replies · 2 participants
Powered by Linen
Title
y

Yuliy Kuroki

04/14/2021, 1:54 PM
Hello everyone! Is there a way to have a prefect local agent wait for all tasks it’s running to finish before starting to work on a new task? It’s running a very slow and intensive process and I want it to finish before starting work on a new one.
k

Kevin Kho

04/14/2021, 2:09 PM
Hi @Yuliy Kuroki! I think the only suggestion I have right now is Task concurrency limiting. This is only available in the paid tiers of cloud though. Otherwise, I think what you have to do is just set them as dependencies of each other so they run sequentially.
y

Yuliy Kuroki

04/14/2021, 2:15 PM
Can I get the currently running flows dynamically at launch and set it as a dependency of the flow?
k

Kevin Kho

04/14/2021, 2:19 PM
Not quite but you can make a new Flow that just calls other flows with
StartFlowRun
as a task and then chain them together as dependencies. If you want something more dynamic, I would say use the
Client
to query for flow run state. You may want the
prefect.client.client.Client.get_flow_run_state
. And then you can start the more resource intensive tasks if the state is SUCCESS?
y

Yuliy Kuroki

04/14/2021, 6:19 PM
Thanks! I’ll try that
View count: 1