https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Akash Rai

    03/18/2021, 7:47 AM
    Hi all, I was looking for a feature.. For a given flow run I want to restart the flow from taskA to taskB only. Where task A is an ancestor of taskB in the flow. All the tasks have a target path. Any possible way to simply achieve this..?
    j
    1 reply · 2 participants
  • h

    Hawkar Mahmod

    03/18/2021, 12:53 PM
    I’ve read through the docs and cannot figure out why my use of LocalResult on a task is not persisting results. There are definitely results, because I can print them out like so:
    print(state.result[baseline_fractions]._result)
    . All that happens is that my
    dir
    provided location produces the directories and then an empty folder. What am I missing here?
    j
    6 replies · 2 participants
  • m

    Marwan Sarieddine

    03/18/2021, 2:53 PM
    Hi folks, I am trying to understand how to make use of the
    not_all_skipped
    trigger, it seems the naming might be a bit not intuitive for me, or I might be missing something - please see the example in the thread (any help would be appreciated)
    c
    m
    24 replies · 3 participants
  • z

    Zach Hodowanec

    03/18/2021, 3:33 PM
    Hi Prefect Team! I am wondering if there is a way to set static
    run_config
    parameters on a Kubernetes Agent for subsequent flows to consume rather than duplicating similar
    run_configs
    across various flows. We currently make use of the
    PREFECT__CLOUD__AGENT__ENV_VARS
    to pass along
    storage
    configurations, but not having much success attempting to update the execution environment to use an internal custom image. I have tried passing the
    IMAGE
    and the
    PREFECT__CONTEXT__IMAGE
    environment variables to my job spec thus far to no avail.
    j
    m
    +1
    17 replies · 4 participants
  • c

    Chris Jordan

    03/18/2021, 4:13 PM
    Hey all, I'm running into an issue with prefect cloud submitting jobs but not doing anything after that. The issue first presented with a new flow I was writing. I restarted my agent and I upgraded from 14.6 to 14.12. This only had the effect of spreading the issue to the rest of my flows. I was wondering if you could offer me some guidance in debugging this issue.
    m
    19 replies · 2 participants
  • l

    Luis Gallegos

    03/18/2021, 4:38 PM
    Hi all, i have this problem. When i use datetime or pendulum library, and then register a flow, this will always take register datetime , not the current time of the running flow. My code. Any suggest?
    import prefect 
    import pendulum
    
    cron_now = pendulum.now()
    str_date = cron_now.strftime('%Y%m%d_%H%M%S')
    custom_schedule = CronSchedule("0 9 * * 0", start_date=cron_now)
    
    
    def slack(text):
    	data = '{"channel":"XXX","text":"%s: %s"}' % (str_date, text)
    
    @task
    def task():
    	## do something
    	##call_slack
    	slack("Hello")
    	
    with Flow("fact_czenk", schedule=custom_schedule) as flow:
    	task = task()
    	
    flow.register()
    s
    4 replies · 2 participants
  • s

    Samuel Hinton

    03/18/2021, 5:19 PM
    Hey team! Just wondering, does prefect have prometheus integration inbuilt already? We’re looking for some method by which to set up alerting if a task fails and our other service utilise prometheus for this. If not, what other methods have people use / are recommended to keep track of failures? Can you set up slack messages on any task failure?
    j
    3 replies · 2 participants
  • d

    dh

    03/18/2021, 7:53 PM
    hello, prefect team. I see there are the two methods available for flow registration. Are both equally recommended? 1. prefect cli [1] 2. functional API
    flow.register
    context: we want to create a dynamically defined flow (e.g.
    Flow(result=S3Result(location=f'bucket/{<http://args.my|args.my>_result_key}', ...)
    for flow reuse. To parse user-pass args, we instantiate the Flow behind
    __main__
    . Now we noticed we can’t use prefect cli to register because it can’t pass extra user args. Alternatively, we are considering
    flow.register
    and wonder if there would be any risks we should be aware of. [1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/register.py#L70
    j
    r
    10 replies · 3 participants
  • m

    Massoud Mazar

    03/19/2021, 1:37 AM
    Hi everyone, I upgraded my Prefect server from
    0.14.6
    to
    0.14.12
    and since then I see even when idle, CPU hovers between 10%-50%.
    docker stats --all --format "table {{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"
    shows the following:
    CONTAINER ID        NAME                CPU %               MEM USAGE / LIMIT
    c6eda45c6f59        tmp_ui_1            0.00%               4.953MiB / 7.432GiB
    2466edd82357        tmp_towel_1         0.00%               52.57MiB / 7.432GiB
    e9d0643d134e        tmp_apollo_1        2.69%               62.68MiB / 7.432GiB
    ad41dcf8646a        tmp_graphql_1       3.24%               69.5MiB / 7.432GiB
    a8f65bcb2efa        tmp_hasura_1        4.62%               153.8MiB / 7.432GiB
    bc9742482bd8        tmp_postgres_1      4.64%               27.86MiB / 7.432GiB
  • j

    Jay Sundaram

    03/19/2021, 1:55 AM
    Hi all, I'm looking for some help with StartFlowRun. Executed the following running on my local machine:
    prefect backend server
    prefect server start
    prefect create project etl-project
    prefect agent local start --label etl-label
    prefect register flow --file simple_flow.py --name a-simple-etl-flow -l etl-label -p etl-project
    In the UI, I can click on QUICK RUN and observe the flow execute. Next , in another simple script named start_flow_run.py:
    from prefect.tasks.prefect.flow_run import StartFlowRun
    
    kickoff_task = StartFlowRun(
        project_name='etl-project',
        flow_name='a-simple-etl-flow'
    )
    which I execute like this:
    python start_flow_run.py
    But nothing happens. The agent doesn't detect it; no activity in the UI. I was expecting my registered flow named 'a-simple-etl-flow' to execute. Please advise. Thanks.
    c
    18 replies · 2 participants
  • s

    Sven Teresniak

    03/19/2021, 8:22 AM
    Is it possible to fetch the entire log from a certain flow rown as text or json without interacting with the ui? Maybe by using curl on a prefect-server UI link? EDIT: easy. using graphql 🙂
    🚀 2
  • j

    Jacopo Tagliabue

    03/19/2021, 10:11 AM
    Hi all, I just got started on prefect (I mostly used AF before for orchestration) and it's great ❤️ Two quick questions: • I was able to manually write Python code that runs my great expectations checkpoints - the GE built in task does not work with my LegacyCheckpoint, and I wasn't able to make even a simple flow run (below). Did anybody get the GE integration work? Is that worth the trouble, when full "verbose" GE code to run validation is 4 lines of Python? • I re-build the DBT cloud client from AF to work as a Prefect task (two tasks really, scheduling the job, and polling DB cloud for the JOB status) - as I prefer DB cloud to DB shell locally installed. Is there any interest from the community in this sort of integration? If yes, happy to chat / share the simple POC I built. GE CODE THAT DOES NOT WORK
    from prefect import task, Flow, Parameter
    from prefect.tasks.great_expectations import RunGreatExpectationsValidation
    
    # Define checkpoint task
    validation_task = RunGreatExpectationsValidation()
    
    with Flow("ge_test") as flow:
        validation_task(checkpoint_name='gitter_checkpoint')
    
    flow.run()
    👀 2
    n
    5 replies · 2 participants
  • s

    Samuel Hinton

    03/19/2021, 11:14 AM
    Hey team! Im trying to use the
    on_failure
    callback of a flow to send a message to a slack channel that ideally looks something like “OH MAN THE FLOW FAILED - Click here to see the flow” with a proper link. Does the flow object contain any information I can use to construct a useful URL, specifically the flow_id?
    a
    6 replies · 2 participants
  • t

    Tim Enders

    03/19/2021, 2:40 PM
    I am trying to raise a RETRY signal inside a mapped Task. The follow up task is using
    flatten
    to gather all of the results back up. When
    RETRY
    is raised I am getting the following:
    TypeError: object of type 'RETRY' has no len()
    n
    m
    +1
    20 replies · 4 participants
  • t

    Tim Enders

    03/19/2021, 2:40 PM
    Does anyone have an idea?
  • m

    Marwan Sarieddine

    03/19/2021, 5:13 PM
    Hi folks, the way it currently stands, one has to re-register a flow to modify the schedule of a flow - is that correct ?
    c
    a
    +1
    8 replies · 4 participants
  • z

    Zach Hodowanec

    03/19/2021, 5:59 PM
    Hi Team, I am currently facing an issue where my flow references a module within the same repository but is unable to load the module at execution time. I am using Prefect Cloud, GH Storage, and a Kubernetes Agent. Seems it could potentially be a pathing issue as I must set
    PYTHONPATH
    in my local environment, but not having much luck setting a similar ENV variable on the agent or job_spec. Any suggestions or documentation that might help get over this hump? Repository Structure:
    /src
    --/flows
    ----/flow.py
    --/tasks
    ----/task.py
    Error Message:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src.tasks'")
    n
    2 replies · 2 participants
  • s

    Samuel Hinton

    03/19/2021, 6:02 PM
    Hey team, is there a way to tell prefect “Retry all failed flows in the last day/hour/etc”?
    n
    3 replies · 2 participants
  • r

    Renzo Becerra

    03/19/2021, 6:44 PM
    Hi Team, I'm running into ECS permission issues while trying to deploy a flow-- Though I'm specifying ecs cluster on agent start, I'm getting a permission error on resource *. This user only has permissions to the ECS cluster listed. Any thoughts? Thanks.
    prefect agent ecs start --cluster my-cluster-arn --launch-type EC2
    
    botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User: arn:aws:iam::**********:user/********* is not authorized to perform: ecs:RegisterTaskDefinition on resource: *
    n
    m
    15 replies · 3 participants
  • t

    Tim Enders

    03/19/2021, 6:58 PM
    How would I share a variable between two tasks? I don't want to return it in the result object because the data grain is off, but I need it for an evaluation.
    n
    14 replies · 2 participants
  • j

    Julie Sturgeon

    03/19/2021, 7:42 PM
    Hello community! Is there a log that tracks when flow schedules were turned turned on or off on Prefect Cloud? Our flow runs seem to magically turn off sometimes, and we’d like to be able to track why that happened. Additionally, we’ve noticed that the schedule slider can be a little finnicky, sometimes showing that a schedule is on when there are no upcoming scheduled runs (or vice-versa), or take a long time to enable/disable.
    👍 2
    n
    8 replies · 2 participants
  • c

    Chris White

    03/19/2021, 8:03 PM
    Hey everyone! Friendly reminder that @Laura Lorenz is about to get started live streaming about DevOps for Prefect Server — come learn about the Server helm chart and all the info you ever wanted about deploying and maintaining Server! Link here:

    https://youtu.be/EwsMecjSYEU▾

    📺 5
    :marvin: 6
    :prefect: 5
    l
    2 replies · 2 participants
  • t

    Trevor Kramer

    03/20/2021, 11:54 PM
    When using ECSRun how can I configure it to not assign a public ip to tasks it launches?
    d
    5 replies · 2 participants
  • e

    Espen Overbye

    03/21/2021, 3:32 PM
    Not sure where to post/report this, but we pretty often have problems accessing the docs:
    c
    s
    +1
    6 replies · 4 participants
  • e

    Espen Overbye

    03/22/2021, 6:51 AM
    Hey team, what is the best practise for accessing binary files from a flow? We're processing a fair amount of weather data, stored in a binary format (netcdf). We're running the kubernetes agent, on Azure, with volumes mounted on the pods running our flows with a custom yaml file. It does works but is slightly brittle
    d
    10 replies · 2 participants
  • v

    Vincent Chéry

    03/22/2021, 9:46 AM
    Hi all 👋 I'm running prefect server on a single node private server and have a LocalAgent process running on the same machine and querying the API for flow runs. The use case is ETL. I just noticed that the same flow, when launched from the command line with flow.run(), takes 30s to execute, and when launched from the UI with "quick run", takes 5 minutes. The first task of the flow returns a list of 200 files, and the subsequent tasks are mapped to handle these files one by one. Any idea why this 10x difference in execution time occurs ? Many thanks!
    a
    4 replies · 2 participants
  • c

    Chris Bowdon

    03/22/2021, 3:53 PM
    Hi, I've noticed that the scheduler service sometimes stops scheduling new runs. Nothing is logged by the towel container, the last message is a standard "Sleeping for 300.0 seconds". Restarting the services resolves the issue. I have a slight suspicion that this is occurring when one flow run overruns past the start time of the next run. But I see no relevant GH issues in either the
    prefect
    or
    server
    projects, so wondered if maybe I'm just misunderstanding something about how it's supposed to work. Is anyone aware of this already?
    k
    4 replies · 2 participants
  • n

    Nathan Walker

    03/22/2021, 6:02 PM
    Hey folks, is it possible to dynamically add tasks to a flow while the flow is running? I'm reading through the flow_runner.py code and it seems like if it's possible, it's...involved.
    c
    5 replies · 2 participants
  • t

    Trevor Kramer

    03/22/2021, 7:22 PM
    I have a flow where the vizualization is adding a bunch of List->List->Dict boxes which makes it hard to read. How can I remove those?
    standardizer_task = submit_standardizer_job(files, bucket, 'mcule', version)
    standardizer_wait_task = AWSClientWait(client='batch', waiter_name='JobComplete', max_retries=2, retry_delay=datetime.timedelta(minutes=1))(waiter_kwargs={'jobs': [standardizer_task]})
    k
    4 replies · 2 participants
  • k

    Kelly Huang

    03/22/2021, 8:11 PM
    Hey! I'm looking to schedule a flow to happen daily, but I don't want the server to be running locally. What is the best way/best agent or executor to use to achieve this?
    k
    5 replies · 2 participants
Powered by Linen
Title
k

Kelly Huang

03/22/2021, 8:11 PM
Hey! I'm looking to schedule a flow to happen daily, but I don't want the server to be running locally. What is the best way/best agent or executor to use to achieve this?
k

Kyle Moon-Wright

03/22/2021, 8:29 PM
Hey @Kelly Huang, If you choose to forego a backend API, you can run a flow in a local process with a schedule that will kick off FlowRuns at each determined interval, but this will be a long running process that isn’t ideal for most users. Additionally, you won’t be using an Agent at all since the Agent is a part of the orchestration layer part of your system and asks your backend API if it needs to execute any Flow Runs every 10 seconds (which we won’t have in this case). Overall, I would recommend signing up for a free Cloud account! That way you don’t have to worry about running a backend and you can run your Agents in your preferred environment (in your case, probably the LocalAgent). You can even orchestrate/schedule using solely the Cloud UI with a running Agent.
k

Kelly Huang

03/22/2021, 8:31 PM
I have a cloud account! I am just wondering what agent I should be using and how to run that agent on the cloud so that I do not need to start the agent locally
k

Kyle Moon-Wright

03/22/2021, 8:33 PM
Great! Any Agent will work in that case, so this decision will be based on your preference of cloud platform / your execution environment.
Here’s an article on getting an Agent up and running on ECS, as an example.
k

Kelly Huang

03/23/2021, 12:09 AM
Thanks so much!
View count: 1