https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Simon Gasse

    08/13/2021, 12:48 PM
    Hi! I am currently coding up a linear workflow which has only a few data-dependencies but many non-data dependencies. My current approach does not look too clean to me - maybe I am missing some other way to do it? Details in the thread.
    s
    • 2
    • 7
  • k

    Kien Nguyen

    08/13/2021, 1:22 PM
    Hi guys, is there anyway I can prevent the flow to register with host name as a label?
    g
    • 2
    • 1
  • t

    Thomas Weatherston

    08/13/2021, 2:21 PM
    Hey everyone! I have a question relating to Prefect Cloud and authenticating with AWS. We're attempting to add an S3 results backend to our flows but we're running into:
    ClientError('An error occurred (AccessDenied) when calling the PutObject operation: Access Denied')
    b
    s
    k
    • 4
    • 34
  • k

    Kien Nguyen

    08/13/2021, 2:55 PM
    Hi guys, anybody have this issue? Stack trace in the reply I think it happens after I add S3Result šŸ˜’miling_face_with_tear:
    k
    • 2
    • 26
  • k

    Kien Nguyen

    08/13/2021, 3:17 PM
    Hi guys, has anyone have experience building the CI/CD pipeline for the flows, for example for every git push will re-register the flow? Cuz right now every times I push the code I will run register for all the flows again, even though the source code of the flows doesn't change
    k
    • 2
    • 12
  • a

    Anze Kravanja

    08/13/2021, 3:28 PM
    Hello! I have a quick question about signals.ENDRUN. Based on my testing even if I raise ENDRUN in a task other tasks are still try being executed. Other signals behave as I would expect them, but with ENDRUN my expectation is that once raised the pipelines stops executing at that time. Am I missing something about ENDRUN? And, is there a mechanism where I could stop the execution of the flow or task that would get around the retrying? (Sometimes an error is of such sort that one knows retrying won’t help and it’s best to just stop)
    k
    m
    • 3
    • 6
  • j

    Joshua S

    08/13/2021, 4:16 PM
    is anyone else not able to accept invitations? i created a account a send invites to my team, and they are getting the invitation is not valid
    āœ… 1
    n
    • 2
    • 1
  • d

    Danny Vilela

    08/13/2021, 5:44 PM
    Hi! I have a somewhat flaky Prefect task that runs some ETL. I’ve already decorated the task with retry logic (
    max_retries
    ,
    retry_delay
    , etc), and there’s a task after the ETL task to send a notification to slack (not quite using the Slack integration, but querying the Slack API directly). That said, is it possible to hook into the retry logic such that whenever that ETL task gets retried, I can execute some logic? For example, ā€œwhenever this task is going to be put into the
    RETRY
    state, run this codeā€? Would this be a state handler like the
    terminal_state_handler
    on the
    Flow
    , but for the specific task? Or just a state handler passed to
    Flow(state_handlers=[send_slack_message_on_retry])
    ?
    k
    • 2
    • 7
  • b

    Brandon

    08/13/2021, 7:17 PM
    Hey folks, I’ve run into a bit of an authentication bug. I had my Prefect Cloud account set up using GitHub integrated auth, and had since removed an old email from my git profile. The authentication appears to be getting hung up looking for that email which is no longer on my account.
    āœ… 1
    k
    n
    • 3
    • 6
  • a

    Anicet Tadonkemwa

    08/13/2021, 7:28 PM
    Hello everyone ! I am a new user here. Hope we will have a great experience together!
    šŸ‘‹ 2
    b
    k
    • 3
    • 3
  • c

    Charles Leung

    08/13/2021, 8:37 PM
    Hello prefect team! I just wanted to ask for a QOL improvement - Right now we have some flows dependent on other flows. Through the UI, if the parent flow fails from one of the child flows failing, if i click through the parent flow's task i cannot navigate to the child flow's page. Instead, I have to grab the flow-id and paste it into the /flow-runs/ url. Another option, is to click back to the main flows project page and click the child flow's name. Is it possible to synch this?
    k
    • 2
    • 2
  • n

    Nivi Mukka

    08/13/2021, 9:02 PM
    Hi Team, I need to apply these Dask Config settings to my DaskExecutor before it gets created. I’m using Prefect Server. Both Prefect and Dask are setup on a GKE cluster. Can this be done using the
    cluster_kwargs
    in DaskExecutor: https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/executors/dask.py#L88-L89 https://docs.prefect.io/api/latest/executors.html#daskexecutor
    k
    • 2
    • 9
  • k

    Kathryn Klarich

    08/13/2021, 9:33 PM
    Can lists be passed as parameters through the UI to your flow? It seems like when I do this, they get converted to a string, which I am not sure if this is the intended behavior. However, when i set a Parameter value to a list (e.g.
    Parameter('abc', default=[1, 2, 3])
    ) in my python code it's fine.
    n
    • 2
    • 2
  • h

    Hugo Shi

    08/13/2021, 10:10 PM
    With prefect cloud - is there a way to prevent overlapping flow runs?
    k
    b
    • 3
    • 6
  • c

    Cooper Marcus

    08/14/2021, 2:27 AM
    I can’t really understand the difference between
    LocalRun
    and
    UniversalRun
    - I’m configuring runs in Prefect Cloud interface, and they are running on one of a few agents, one per server. Each server has its own label for its Prefect agent (eg. Server1 has label
    server1
    ) - when I configure the run on Prefect Cloud, I think (but Iā€m not 110% certain, I’ve been doing this too long today šŸ˜‰ ) that when I specify
    server1
    label, it doesn’t matter whether I select the ā€œlocalā€ or ā€œuniversalā€ option - huh?
    k
    • 2
    • 11
  • y

    YD

    08/15/2021, 2:15 AM
    Job "late" for long time what can be the cause of:
    No heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2
    I also have a flow that I started manually, but it does not start, even though I have no other flows running. in general, if I have a flow that I need to ensure is running at an exact time (no more than few seconds off) , is it good to use Prefect for this, or is it better to use cron job ?
    k
    d
    y
    • 4
    • 52
  • l

    Lan Yao

    08/16/2021, 4:28 AM
    Hi everyone! I have a Prefect agent running in Kubernetes cluster as a deployment.
    prefect agent kubernetes install -t ${TOKEN} --rbac --label ${CLUSTER_NAME} | kubectl apply -f -
    I’m trying to use the Prefect Cloud Automations to do health monitoring of the agent. However, I can’t do it at the agent installation phase as per the Prefect Docs. And I can’t manage to add the
    agent_config_id
    argument to a running agent either. I tried GraphQL, but didn’t find a proper mutation to make it. Is there any way to make the Automations work for Prefect agent running in Kubernetes cluster for my case? šŸ™
    šŸ†™ 1
    m
    m
    • 3
    • 7
  • m

    Marie

    08/16/2021, 10:46 AM
    Hi community, Within one of my flow, data is sent over and stored in a postgres database. I noticed than when run within a prefect flow the task is 20 times slower than it usually is. I use docker storage and docker agents. When I run the comparison without prefect I used the exact same docker image. I was looking at the documentation but I couldn't figure out if throttling is used at some point by prefect to limit the amount of data sent or stored maybe?
    k
    • 2
    • 10
  • n

    Nacho Rodriguez

    08/16/2021, 11:12 AM
    Hello everyone, I am trying to use PostgresExecute from the Task Library, but passing the user as a Parameter. The problem is that you can only pass the user when instantiating the task outside the flow object. I have tried to make it work by using .run() in the parameter and creating the task inside the Flow, but I don't know if this is the best way. Is there a better way to do it?
    k
    • 2
    • 5
  • i

    Ivan Indjic

    08/16/2021, 12:05 PM
    Hi guys, is there any way to set up custom Slack messages?
    k
    • 2
    • 1
  • t

    Tim Enders

    08/16/2021, 2:14 PM
    Is there a task limit within a flow? I have a flow that is large and maps into a large number of individual tasks. It has stopped running the very last at all and I have no errors, it just sits there. Even running DEBUG logs, it never even attempts to start the last task.
    k
    • 2
    • 22
  • i

    Italo Barros

    08/16/2021, 2:39 PM
    Hello everyone! Just another question here, I have one Flow running on the Prefect Cloud, and the agent is running locally. The Flow is quite simple, he reads a .ini file and runs some Data Processing based on the .ini values (I'm using configparser). I noticed that if I change the values on the .ini file without registering the Flow again, he will run as it has the "old" .ini file, not detecting the changes made on the file. So, to properly "detect" the changes, I need to re-run the Flow and register again on the Cloud. So here's the question, it's possible to detect changes on local files without registering the Flow again? I'm asking because I have some pipelines that need to read some CSVs and .txt files locally, and I'm afraid that I have to re-run and register the script every day to detect the changes on the files.
    k
    • 2
    • 6
  • r

    Robin Norgren

    08/16/2021, 4:54 PM
    Hi all, I am trying to establish a long running agent as a service in ECS as described here: https://docs.prefect.io/orchestration/agents/ecs.html#running-ecs-agent-in-production. But struggling to authenticate my agent. Details in thread
    k
    m
    • 3
    • 5
  • y

    YD

    08/16/2021, 6:29 PM
    What will happen if a flow starts but does not finish running and the next schedule run of the same flow runs again ? is there a way to get some kind of alert if this happens ?
    k
    • 2
    • 3
  • k

    Kathryn Klarich

    08/16/2021, 9:33 PM
    Hello, I am trying to make a default run config by super classing the ECSRun class (e.g.
    class FargateECSRun(ECSRun)
    ) and filling in the parameters like
    run_task_kwargs
    with defaults so that they don't need need to be duplicated in every flow. However, when I go to register the flow, I get an error
    ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
    - is what I am trying to do not allowed? I'm not sure how to debug this because when I try to these directions,
    built_storage
    is just an empty dictionary.
    k
    • 2
    • 2
  • m

    Michael Warnock

    08/16/2021, 9:41 PM
    I'm running into a version conflict between the latest versions of prefect and coiled:
    The conflict is caused by:                                                                                                                   
        prefect 0.15.3 depends on click<8.0 and >=7.0                                                                                            
        prefect[aws,github,jupyter] 0.15.3 depends on click<8.0 and >=7.0                                                                        
        coiled 0.0.47 depends on click>=7.1
    k
    c
    • 3
    • 6
  • t

    Tim Chklovski

    08/16/2021, 9:45 PM
    Hi all, wondering if anyone has looked into/considered Saturn Cloud offerings? my org is looking into using Saturn Enterprise, which provides Prefect + Dask over EKS (Kubernetes in AWS). The people behind it at SaturnCloud.io have been excellent so far — Anyone have experiences using the platform? Anyone considered it but decided on something else? Thanks!
    k
    • 2
    • 1
  • j

    Jacob Goldberg

    08/16/2021, 10:16 PM
    Hello! I have a question about the DateTimeParameter. My goal is to have a flow run on a schedule, with dynamic date inputs, e.g. run every Monday where
    start_date
    is last Monday and the
    end_date
    is this Monday.Ā However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the
    DateTimeParameter
    would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
    import datetime as dt
    from prefect import Flow
    from prefect.core.parameter import DateTimeParameter
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    
    schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
    with Flow(name="test", schedule=schedule) as test_flow:
    Ā Ā start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
    Ā Ā end_date = DateTimeParameter("end_date", required=False) or dt.date.today()Ā 
    
    	 my_process(start_date, end_date)
    This issue is that
    my_process()
    requires a datetime object but is receiving
    NoneType
    . It seems the
    or
    operator is not working as I would expect when defining
    start_date
    and
    end_date
    . Although
    DateTimeParameter
    "evaluates" to
    None
    in Prefect, the
    or
    operator sees them as a Prefect Parameters, so
    start_date
    and
    end_date
    both get defined as
    None
    . What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?
    k
    • 2
    • 2
  • j

    Jose Daniel Posada Montoya

    08/16/2021, 11:35 PM
    Hi! I've been exploring Prefect quite a bit these days and I'm really impressed. I've been looking for a way to keep the containers where the Flows run (withĀ 
    DockerRun
    ) in sync with the code/package I have in the repository. One of the approaches I came up with was to develop a Storage (very similar to Git) that would do the following: 1. Clone the repository into a temp folder. 2. Extract the Flow (as it is currently done).Ā  3. Search the repo for a specifiedĀ 
    Dockerfile
    . (theĀ 
    Dockerfile
    Ā would do something like anĀ 
    ADD
    Ā of the entire repo and aĀ 
    pip install -e .
    Ā so that the image has the latest code in it). 4. Build the image with a certain tag (the same tag thatĀ 
    DockerRun
    Ā would use). (This is all assuming that the Storage code is executed in the Docker Agent process just before the container is started which would give time to create the image before theĀ 
    run_config
    Ā create the container and run the flow inside) I was going to do a test by creating the custom Storage but I ran into things I didn't take into account likeĀ 
    serialization
    . But before I explore this option further I wanted to ask: • Is there any way to achieve what i want, that I am not seeing, that doesn't involve a Container Registry or a CI pipeline? • Does the solution I propose make sense to you? Do you think it would work or am I missing something? • Would it be useful for more people and would it be worth some contribution? I would appreciate any guidance, advice or comments on the subject.
    k
    • 2
    • 1
  • d

    Dakota McCarty

    08/17/2021, 1:15 AM
    Hello! I’m new to Prefect tasks and flows. I’m using Prefect Core in Deepnote (similar to Saturn Cloud). My flows are running great and are registered in Prefect Cloud but after the flows run in Deepnote the cloud UI doesn’t seem to log it? Our plan is to use Deepnote as our orchestrator and run the flows from there and have Prefect Cloud be the UI part of the puzzle… I felt like I saw that’s an option but may be mistaken? I feel like I’ve read every page in the documentation at this point but have no idea what the issue could be and troubleshooting suggestions? or newbie advice?
    k
    a
    • 3
    • 10
Powered by Linen
Title
d

Dakota McCarty

08/17/2021, 1:15 AM
Hello! I’m new to Prefect tasks and flows. I’m using Prefect Core in Deepnote (similar to Saturn Cloud). My flows are running great and are registered in Prefect Cloud but after the flows run in Deepnote the cloud UI doesn’t seem to log it? Our plan is to use Deepnote as our orchestrator and run the flows from there and have Prefect Cloud be the UI part of the puzzle… I felt like I saw that’s an option but may be mistaken? I feel like I’ve read every page in the documentation at this point but have no idea what the issue could be and troubleshooting suggestions? or newbie advice?
k

Kevin Kho

08/17/2021, 1:29 AM
Hey @Dakota McCarty, I suspect you are running Flows with
flow.run()
. Is that right? The UI will only keep track of flows that the Prefect scheduler triggered.
flow.run()
is intended for local testing and that flow is not registered with Prefect Cloud. If you want to keep the notebook setup, you need to use
flow.register()
and then
create_flow_run()
d

Dakota McCarty

08/17/2021, 1:47 AM
Hey @Kevin Kho, thanks for the reply! I’ve tried create_flow_run() and it does successfully create the flow run in the Cloud UI. I guess I just don’t understand how to then run it via Deepnote? I thought just starting the agent would do it but then I get this and nothing changes.
k

Kevin Kho

08/17/2021, 1:52 AM
The agent is intended to be a long running process that polls Prefect Cloud every 10 seconds. It is responsible for the execution of Flows. So normally for deployment you leave it as a background process on some box like EC2. Or maybe you have one running in your Kubernetes cluster. You would either need a long running service outside of deepnote, but if that would execute the flows but if you wanted to execute it in deepnote, you would need a long running process hosted there to pick and up and execute the flows. Then from the notebook when you create the flow run, it would appear in the UI.
I think it’s hard to run an agent AND run the flow in the same notebook because the agent as a process will take over the notebook. When hosted as a process on a machine, it creates new subprocesses to run your flow
d

Dakota McCarty

08/17/2021, 2:14 AM
ah, got it! Was a bit scared that’s the case haha. Unfortunately, I don’t think that’d be an option with out current set up
Found a cheat-y way to do it! but one that works!
a

Alexander Seifert

08/17/2021, 8:48 AM
@Dakota McCarty do tell! šŸ™‚
d

Dakota McCarty

08/17/2021, 10:28 AM
@Alexander Seifert sure! I’m putting together a blog post but here’s the first draft of that https://deepnote.com/@Creative-Force/Prefect-demo-beEs0aYGQa-7oU_0UX1yyQ
šŸ‘ 2
k

Kevin Kho

08/17/2021, 2:01 PM
Nice! Thanks for sharing
client.create_flow_run(flow_id, labels='deepnote')
your labels might need to be a list of strings because this might chop it up per letter since the string is an iterable
:thank-you: 1
View count: 4