https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • i

    itamar

    05/31/2021, 9:51 AM
    Hello, I am new to Prefect, I have a basic scenario: I am running a flow on schedule, if the job is still running and the next schedule arrived (the older still running), I want to skip the next instance with success and log:old job is still running. (i don't want it to wait, just skip the next schedule. usually, we do it with ETL and create a file or save the PID of the current job run and when the next fire up it checks if the file exists or PID, if it exists we finish the etl job with success and an email ("already running") is there a better way in Prefect? thanks
    k
    1 reply · 2 participants
  • a

    Amit Gal

    05/31/2021, 10:43 AM
    Hi all, I’m building a dependent flow, and wanted to verify if my understanding of a limitation there is correct, or am I missing something. Assuming I have something like the following script (simplified from the example in the docs):
    flow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
    flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
    
    ## Flow of flows
    
    with Flow("parent-flow") as flow:
        flow_b.set_upstream(flow_a)
    
    flow.run()
    as far as I can see, running this script waits with the scheduling of
    flow_b
    , until
    flow_a
    is completed. Is there a way to schedule them both, but wait with running
    flow_b
    until
    flow_a
    is completed? My use case is I use a VM to run the flows, but my local machine (or phone) to run a small script that schedules them ad-hoc.
    flow_a
    is often long, while
    flow_b
    is a short one - and so as it currently stands any issues with my local machine might interrupt scheduling
    flow_b
    . There are a couple of ways I can imagine to solve this, but I wanted to make sure if my understanding is correct, or if there is a way to schedule both at and create a dependency for the
    running
    of the flow?
    k
    5 replies · 2 participants
  • j

    Jacob Blanco

    05/31/2021, 10:51 AM
    Is there any way in Cloud UI to view Flow runs by parameter value?
    a
    k
    +2
    9 replies · 5 participants
  • f

    fabian wolfmann

    05/31/2021, 3:19 PM
    Hi! is there a way to access to the params given to a task on a run, when the state is pending->running, when the next state is running
    k
    5 replies · 2 participants
  • n

    Nikola Lusic

    05/31/2021, 10:05 PM
    Hey guys, I'm having some issues with parallel execution. This is my flow:
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    @task
    def process_key(key: str):
        import time
        time.sleep(3)
        return key
    
    with Flow("process_keys") as flow:
        processed_keys = process_key.map([1,2,3])
    
    if __name__ == '__main__':
        flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
        flow.run()
    This is the output:
    [2021-06-01 00:01:36+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    However, if I change the scheduler from
    threads
    to
    processes
    , it seems the parallel execution is substituted with a serial execution:
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    @task
    def process_key(key: str):
        import time
        time.sleep(3)
        return key
    
    with Flow("process_keys") as flow:
        processed_keys = process_key.map([1,2,3])
    
    if __name__ == '__main__':
        flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=3)
        flow.run()
    This is the new output:
    [2021-06-01 00:04:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
    [2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
    [2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
    [2021-06-01 00:04:18+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:18+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    I cannot get any
    scheduler=processes
    flow to actually run in parallel, is there something I'm missing?
    z
    4 replies · 2 participants
  • j

    Jason Prado

    05/31/2021, 10:57 PM
    I want to make sure I got this right. Does this schedule execute every morning at 9AM NY time?
    Schedule(
      clocks=[
        IntervalClock(
          start_date=pendulum.datetime(2021, 1, 1, 9, tz="America/New_York"),
          interval=timedelta(days=1),
        ),
      ],
    )
    k
    2 replies · 2 participants
  • a

    Aurélien Vallée

    06/01/2021, 4:52 AM
    I am still a bit confused about dependent flows 😞 Basically I have a "data collection" flow that should run every minute, and a "generate analytics flow" that should run every hour. I need the "generate analytics" flow to run just after the "data collection" flow. Serially, that would expand to:
    13:58 -> Collect data
    13:59 -> Collect data
    14:00 -> Collect data + generate analytics
    14:01 -> Collect data
  • a

    Aurélien Vallée

    06/01/2021, 4:54 AM
    I just don't understand how https://docs.prefect.io/core/idioms/flow-to-flow.html helps with this, if the schedule of "data collection" is every minute and the schedule of "generate analytics" is every hour, if I add a
    FlowStartRun
    of "collect data" before "generate analytics" they would be collected two times right?
  • a

    Aurélien Vallée

    06/01/2021, 4:55 AM
    so currently I create a humongous flow that does everything possible, and I skip parts of it all over the place depending on the current time
  • a

    Aurélien Vallée

    06/01/2021, 4:56 AM
    is that the only way to do it?
    k
    2 replies · 2 participants
  • n

    Nikola Lusic

    06/01/2021, 10:25 AM
    Does the
    ECSRun
    configuration combined with
    LocalDaskExecutor(scheduler="processes", num_workers=4)
    support parallel execution of mapped tasks? Currently I'm unable to get the ECS task to spawn any additional processes - all are run in sequence (first image). When running the same flow on the local Prefect environment, the tasks are all done in parallel (second image). If I use
    LocalDaskExecutor(scheduler="threads", num_workers=4)
    , the flow tasks are executed in parallel, but threaded flow only covers part of our use cases.
    c
    k
    20 replies · 3 participants
  • h

    Howard Cornwell

    06/01/2021, 1:45 PM
    Hey, I’m receiving this error when running a docker flow on a k8s cluster. The flow runs fine locally, but fails every time on the cluster:
    Failed to load and execute Flow's environment: TypeError("default() got an unexpected keyword argument 'default_scopes'")
    I tried out some already-deployed flows, they run fine. But if I re-deploy them they start raising the same error. Running
    0.13.19
    in the container and on the server. Any advice would be great!
    k
    1 reply · 2 participants
  • z

    Zach Schumacher

    06/01/2021, 3:18 PM
    Hey, I’m on a paid standard account and have been running into the below semi-frequently. Note this is from a task run from last week.
    n
    1 reply · 2 participants
  • c

    Carlos Gutierrez

    06/01/2021, 3:21 PM
    Hi all 🙂 I have a question regarding the usage of
    flow.serialized_hash()
    for flow change detection in automated flow register processes. I found out that whenever a flow is registered with a particular task, lets say
    task_A
    , and then I update the values of the parameters passed to the task (imagine for instance
    task_A(var='typo_string') --> task_A(var='correct_string')
    ), the
    serialized_hash()
    will remain invariant and thus the flow will not work according to the last changes because it will not bump a new version to the server. Would like to know if there is a better way to do this or I might be using the wrong approach
    k
    m
    10 replies · 3 participants
  • r

    Raúl Mansilla

    06/01/2021, 4:11 PM
    Hello friends! I´m trying to run a flow into an ECS cluster…the thing is that I see the task, pending and running but the flow never gets done as one error raise:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'ecs_test'")
    k
    36 replies · 2 participants
  • s

    Shea O'Rourke

    06/01/2021, 5:05 PM
    Hey all, I work at a Boston startup that utilizes Prefect very heavily and have a question about Prefect's API rate limiting. In the past, we used to have no control on the amount of concurrent calls we would make to Prefect. We then started to get rate limited by Prefect as our requests would come in large bursts that may have been overloading Prefect. We then started to only allow one concurrent Prefect call at a time to reduce possible lost of requests as a result of rate limiting. This then proved too small of an amount and we've boosted it up to five concurrent calls. We are currently having trouble with this being too small of an amount and are looking to further boost the number. Our question is whether there are specific limits we should be aware of and work around to prevent the API rate limiting that has affected us poorly in the past.
    n
    3 replies · 2 participants
  • a

    Alex Furrier

    06/01/2021, 8:13 PM
    I'm trying to understand how to create a Flow of Flows (dependent flows) from flows I've been running locally. Currently I can run the flows in succession using a Makefile where each file invoked by python contains a Flow and a flow.run() wrapped with CLI param inputs:
    test-flow-of-flows:
    	@echo 'Running flow A'
    	@python flows/flow_a.py \
    		param1=foo \
    	    param2=bar 
    	@python flows/flow_b.py \
    		param1=foo \
    	    param2=bar 		
    	@python flows/flow_c.py \
    		param1=baz \
    	    param2=bar
    I would like to combine those into a single Flow run with shared parameters passed to the flow runs. That seems to be what's described in this documentation. That mentions registering flows using the orchestration API to specific projects. So far I've been running flows without that. Is there any way to create a flow of flows with local importing of flows or do they have to be registered with the orchestration API to do so?
    k
    3 replies · 2 participants
  • n

    Nathan Atkins

    06/01/2021, 10:11 PM
    Catch Up CronClock: I wanted to use the CronClock to run a flow at the same time every day. I also want it to catch up if the start date and after date are before today. start_date=5/25/2021, after_date=5/30/2021 and today=6/2/2021. I would get a schedule to run on 5/31, 6/1 and 6/2. These would all be processed ASAP and then the flow would wait until 6/3 to run again. I’m having 2 problems with this. 1. I don’t see how to pass
    after
    into the schedule as part of
    flow.run()
    . I have hacked
    flow._run()
    to support this. 2. Something past my Python knowledge is causing
    CronClock.events()
    to do something weird when the yield returns. This causes the execution to drop directly out of the while loop and exit the method. Each new call to
    schedule.next()
    winds up creating a new
    croniter
    and running the same event start_date again. If I build the clock directly and call it’s next on the iterable returned
    events()
    it works as I would expect.
    c
    6 replies · 2 participants
  • b

    Ben Collier

    06/02/2021, 4:24 AM
    Hi there Prefect people! Just a quick question. We have a our agent running up behind a firewall, and have added the firewall’s root cert to the keychain at /etc/ssl/certs/ca-certificates.crt However, we’re still getting [SSL: CERTIFICATE_VERIFY_FAILED] when trying to authenticate with Prefect Cloud. I’ve set the env var SSL_CERT_FILE to this location, in case Python’s picking that up. Would you confirm where you’re picking up root certs from?
  • b

    Ben Collier

    06/02/2021, 4:36 AM
    QUICK UPDATE: Realised that
    requests
    in python uses its own bundle and set
    REQUESTS_CA_BUNDLE
    . All now good.
    👍 3
  • a

    Amanda Wee

    06/02/2021, 9:19 AM
    Just thought I'd point out that this pull request introduced an empty file named
    =
    to the root of the prefect repository: https://github.com/PrefectHQ/prefect/pull/4499/files Is that intentional?
    z
    1 reply · 2 participants
  • s

    Snehotosh

    06/02/2021, 11:59 AM
    Hi Experts, We are evaluating Prefect and new to Prefect as well. 3 questions: 1) Do Prefect has any support for incremental builds and pipeline testing using python testing frameworks. 2) We checked that Prefect support Dask for distributed computing.Is there any support for pyspark (both Databricks and non Databricks)? 3) Can Prefect be used with ML deployment libraries like Seldon core to deploy on Kubernetes.Basically, looking for deploying trained model on Kubernetes cluster for prediction. Thanks, Snehotosh
    k
    5 replies · 2 participants
  • a

    ash

    06/02/2021, 12:31 PM
    hi everyone, i am facing this error for a very simple prefect flow, what could the error be
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured
    c
    k
    10 replies · 3 participants
  • r

    Romain

    06/02/2021, 1:07 PM
    Hi folks, I have a question about dask adpative cluster and prefect. We have a task which is mapped, and each of these mapped tasks is quite long (~10min), and intense in resources leading the dask adaptive cluster to scale up. But because it takes some time for the new workers to be available (K8s has to allocate some new nodes ....), basically all the mapped tasks are given to the first exisiting worker, and they are not redistributed while the new workers are finally available. Have anyone encountered such an issue, and how have you dealt with?
    k
    c
    19 replies · 3 participants
  • t

    Tomasz Szuba

    06/02/2021, 2:20 PM
    Hi Everyone!
    👋 1
  • t

    Tomasz Szuba

    06/02/2021, 2:21 PM
    I am facing an issue, where I try to set both state message and result in my task
  • t

    Tomasz Szuba

    06/02/2021, 2:23 PM
    I tried:
    def run(self, parameter):
      result="hello"
      raise signals.SUCCESS(
        message=f'{parameter}',
        result=result
      )
    But this make result to be None in another task that depends on this result
    k
    n
    +1
    76 replies · 4 participants
  • p

    Pedro Henrique

    06/02/2021, 2:38 PM
    Hi everyone, how do I configure the execution of a flow after the successful execution of the previous one, but the 2 flows are in different agents
    k
    n
    +1
    9 replies · 4 participants
  • p

    Peter Roelants

    06/02/2021, 2:57 PM
    Hi Prefect, If I understand correctly there is currently no direct way of running a Flow with StartFlowRun and return a result to be used by a next task. E.g.
    flow_a = FlowRunTask(flow_name='flow_a', wait=True)
    
    @task
    def task_b(param_b):
        ...
    
    with Flow('flow_c') as flow:
        param_a = Parameter('param_a')
        result_a = flow_a(parameters={'param_a': param_a})
        result_b = task_b(result_a)
    However I noticed people creating some workarounds. However, I want to avoid setting a persisted result, and want to share a serializable result directly to the next task. I was wondering: Is there a canonical way to add a result to a flow, get that result from a StartFlowRun, and use that result in a following task?
    k
    16 replies · 2 participants
  • i

    itay livni

    06/02/2021, 3:19 PM
    Hi - I have a python app with tinkter UI for interactions. Its essentially one long script. Can prefect still be used once the code is broken down to tasks? Will the flow pause and wait for the user to handle the interaction? Is the pause for approval the way to go? Thanks https://docs.prefect.io/core/idioms/pause-for-approval.html#pause-for-approval
    k
    2 replies · 2 participants
Powered by Linen
Title
i

itay livni

06/02/2021, 3:19 PM
Hi - I have a python app with tinkter UI for interactions. Its essentially one long script. Can prefect still be used once the code is broken down to tasks? Will the flow pause and wait for the user to handle the interaction? Is the pause for approval the way to go? Thanks https://docs.prefect.io/core/idioms/pause-for-approval.html#pause-for-approval
k

Kevin Kho

06/02/2021, 3:58 PM
Hey @itay livni, not 100% sure but it seems like it could work. I can see it working with Prefect cloud where the buttons hit the GraphQL API and orchestrate actions that way. Or were you thinking of doing this without Cloud?
i

itay livni

06/02/2021, 4:23 PM
@Kevin Kho - Cannot do it with cloud because of business constraints. No servers (on prem or off prem), go figure.
View count: 1