• Luis Gallegos

    Luis Gallegos

    1 year ago
    Hi all, i have this problem. When i use datetime or pendulum library, and then register a flow, this will always take register datetime , not the current time of the running flow. My code. Any suggest?
    import prefect 
    import pendulum
    
    cron_now = pendulum.now()
    str_date = cron_now.strftime('%Y%m%d_%H%M%S')
    custom_schedule = CronSchedule("0 9 * * 0", start_date=cron_now)
    
    
    def slack(text):
    	data = '{"channel":"XXX","text":"%s: %s"}' % (str_date, text)
    
    @task
    def task():
    	## do something
    	##call_slack
    	slack("Hello")
    	
    with Flow("fact_czenk", schedule=custom_schedule) as flow:
    	task = task()
    	
    flow.register()
    Luis Gallegos
    Samuel Hinton
    4 replies
    Copy to Clipboard
  • Samuel Hinton

    Samuel Hinton

    1 year ago
    Hey team! Just wondering, does prefect have prometheus integration inbuilt already? We’re looking for some method by which to set up alerting if a task fails and our other service utilise prometheus for this. If not, what other methods have people use / are recommended to keep track of failures? Can you set up slack messages on any task failure?
    Samuel Hinton
    Jenny
    3 replies
    Copy to Clipboard
  • dh

    dh

    1 year ago
    hello, prefect team. I see there are the two methods available for flow registration. Are both equally recommended? 1. prefect cli [1] 2. functional API
    flow.register
    context: we want to create a dynamically defined flow (e.g.
    Flow(result=S3Result(location=f'bucket/{<http://args.my|args.my>_result_key}', ...)
    for flow reuse. To parse user-pass args, we instantiate the Flow behind
    __main__
    . Now we noticed we can’t use prefect cli to register because it can’t pass extra user args. Alternatively, we are considering
    flow.register
    and wonder if there would be any risks we should be aware of. [1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/register.py#L70
    dh
    Jim Crist-Harif
    +1
    10 replies
    Copy to Clipboard
  • m

    Massoud Mazar

    1 year ago
    Hi everyone, I upgraded my Prefect server from
    0.14.6
    to
    0.14.12
    and since then I see even when idle, CPU hovers between 10%-50%.
    docker stats --all --format "table {{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"
    shows the following:
    CONTAINER ID        NAME                CPU %               MEM USAGE / LIMIT
    c6eda45c6f59        tmp_ui_1            0.00%               4.953MiB / 7.432GiB
    2466edd82357        tmp_towel_1         0.00%               52.57MiB / 7.432GiB
    e9d0643d134e        tmp_apollo_1        2.69%               62.68MiB / 7.432GiB
    ad41dcf8646a        tmp_graphql_1       3.24%               69.5MiB / 7.432GiB
    a8f65bcb2efa        tmp_hasura_1        4.62%               153.8MiB / 7.432GiB
    bc9742482bd8        tmp_postgres_1      4.64%               27.86MiB / 7.432GiB
  • Jay Sundaram

    Jay Sundaram

    1 year ago
    Hi all, I'm looking for some help with StartFlowRun. Executed the following running on my local machine:
    prefect backend server
    prefect server start
    prefect create project etl-project
    prefect agent local start --label etl-label
    prefect register flow --file simple_flow.py --name a-simple-etl-flow -l etl-label -p etl-project
    In the UI, I can click on QUICK RUN and observe the flow execute. Next , in another simple script named start_flow_run.py:
    from prefect.tasks.prefect.flow_run import StartFlowRun
    
    kickoff_task = StartFlowRun(
        project_name='etl-project',
        flow_name='a-simple-etl-flow'
    )
    which I execute like this:
    python start_flow_run.py
    But nothing happens. The agent doesn't detect it; no activity in the UI. I was expecting my registered flow named 'a-simple-etl-flow' to execute. Please advise. Thanks.
    Jay Sundaram
    Chris White
    18 replies
    Copy to Clipboard
  • s

    Sven Teresniak

    1 year ago
    Is it possible to fetch the entire log from a certain flow rown as text or json without interacting with the ui? Maybe by using curl on a prefect-server UI link? EDIT: easy. using graphql 🙂
  • Jacopo Tagliabue

    Jacopo Tagliabue

    1 year ago
    Hi all, I just got started on prefect (I mostly used AF before for orchestration) and it's great ❤️ Two quick questions: • I was able to manually write Python code that runs my great expectations checkpoints - the GE built in task does not work with my LegacyCheckpoint, and I wasn't able to make even a simple flow run (below). Did anybody get the GE integration work? Is that worth the trouble, when full "verbose" GE code to run validation is 4 lines of Python? • I re-build the DBT cloud client from AF to work as a Prefect task (two tasks really, scheduling the job, and polling DB cloud for the JOB status) - as I prefer DB cloud to DB shell locally installed. Is there any interest from the community in this sort of integration? If yes, happy to chat / share the simple POC I built.GE CODE THAT DOES NOT WORK
    from prefect import task, Flow, Parameter
    from prefect.tasks.great_expectations import RunGreatExpectationsValidation
    
    # Define checkpoint task
    validation_task = RunGreatExpectationsValidation()
    
    with Flow("ge_test") as flow:
        validation_task(checkpoint_name='gitter_checkpoint')
    
    flow.run()
    Jacopo Tagliabue
    nicholas
    5 replies
    Copy to Clipboard
  • Samuel Hinton

    Samuel Hinton

    1 year ago
    Hey team! Im trying to use the
    on_failure
    callback of a flow to send a message to a slack channel that ideally looks something like “OH MAN THE FLOW FAILED - Click here to see the flow” with a proper link. Does the flow object contain any information I can use to construct a useful URL, specifically the flow_id?
    Samuel Hinton
    Amanda Wee
    6 replies
    Copy to Clipboard
  • Tim Enders

    Tim Enders

    1 year ago
    I am trying to raise a RETRY signal inside a mapped Task. The follow up task is using
    flatten
    to gather all of the results back up. When
    RETRY
    is raised I am getting the following:
    TypeError: object of type 'RETRY' has no len()
    Tim Enders
    nicholas
    +2
    20 replies
    Copy to Clipboard