• jspeis

    jspeis

    1 year ago
    Hello! I’ve been experimenting with prefect locally and I am interested in setting up a simple docker/k8s configuration to boot up server / agent in the cloud and point to external DB. Before I put something together, just wanted to check if anyone had already done something similar?
    jspeis
    Michael Adkins
    3 replies
    Copy to Clipboard
  • a

    Andrey Tatarinov

    1 year ago
    Hi! So I have an external API with pagination, that provides some results with a call and a link to next page of results. Total results (all chunks combined) of this API are huge and do not fit in memory. Ideally I would make a task which pulls that API a generator so that I can start mapping its results before pulling complete. Is it possible to achieve that in Prefect?
    a
    nicholas
    17 replies
    Copy to Clipboard
  • s

    Sanjay Patel

    1 year ago
    Hi, I'm trying to understand how to use StartFlowRun as per Running dependent flows but can't get my head around the concept of passing parameters from one flow into the next. the 'Running a parametrized flow' example here doesn't use a flow as an input. I've done a few searches but can't find exactly what I am looking for. this is one example I was looking at but it doesn't give enough detail
    with Flow('first') as flow:
        a = task_a()
        b = task_b(a)
        c = task_c(b)
        StartFlowRun(flow_name='second', project_name=...)(parameters={"input"=c})
    this is the closest to what I am trying to do but the response ended with using state_result.
    with Flow('first') as flow:
        a = task_a()
        b = task_b(a)
        c = task_c(b)
        
    with Flow('second') as flow:
        param = Parameter('input')
        d = task_d(param)
        
    # How to do something like this:
    flow_a = StartFlowRun(flow_name="first", project_name="examples", wait=True)
    flow_b = StartFlowRun(flow_name="second", project_name="examples", wait=True)
    with Flow('total') as flow:
        a = flow_a()
        b = flow_b()(upstream_tasks=[a], parameters={'input': a.d})
    I have hte option of changing one of my flows so it produces the output I need for the next flow. so 2 questions • how does the output of flow_a get referenced into flow b • how do you access the parameter input that comes from flowa into flowb thank you!
    s
    nicholas
    +1
    7 replies
    Copy to Clipboard
  • Aiden Price

    Aiden Price

    1 year ago
    Hi folks, when I try to add an
    apply_map()
    to my flow I get this error;
    ValueError: Cycle found; flows must be acyclic!
    Unfortunately the stack trace doesn't provide any hints as to where I've accidentally created a cycle in the graph. I've done some comment-driven-debugging and found that the problem is related to a case statement in the function I'm attempting to
    apply_map()
    . I've also done
    flow.visualize()
    and it does output a picture but because it's a big flow it's hard to spot the cycle in there. Does anyone have any hints on how to better debug cycles in flow graphs? Thanks in advance.
    Aiden Price
    nicholas
    8 replies
    Copy to Clipboard
  • v

    Vincent

    1 year ago
    Hi all, I was wondering if someone could help me identify why only part of my flow executes in parallel. As shown in this flow diagram, 5/8 tasks have been mapped, while 3 are still pending. I am sure that I started enough Dask worker nodes to process the compute, but these jobs are stuck pending. Thanks for the advice !
    v
    m
    2 replies
    Copy to Clipboard
  • a

    Andreas Jung

    1 year ago
    Just started with Prefect, wrote this example script to measure the execution speed. The script contains 4 tasks where only the "parse" task is doing some real processing on the data (taking less than 1ms). However the over all execution speed of the whole flow is always between 1000ms and 1500ms...why is this? There such a huge overhead in the underlying task scheduler/executor?
    import time 
    from base64 import b64decode 
     
    from prefect import Flow, task 
     
    import reportparser 
     
     
    @task 
    def fetch_message(): 
     
       ts = time.time() 
       report_data = b64decode( 
           b"ew4AAAAAAADmAgAABgAAAQAAH+EH3QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" 
           b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIBPwgQAA4AAAAA" 
       ) 
       print("fetch  ", time.time()) 
       print("fetch  ", time.time() - ts) 
       return report_data 
     
     
    @task 
    def classify_message(msg): 
       msg_type = 0 
       ts = time.time() 
       print("classify", time.time()) 
       print("classify", time.time() - ts) 
       return msg_type 
     
     
    @task 
    def parse_message(msg, msg_type): 
       ts = time.time() 
       result = reportparser.parse(msg) 
       print("parse  ", time.time()) 
       print("parse  ", time.time() - ts) 
       return result 
     
     
    @task 
    def deliver_message(result): 
       ts = time.time() 
       print("deliver ", time.time()) 
       print("deliver ", time.time() - ts) 
     
     
    def main(): 
     
       with Flow("reportparser") as flow: 
           msg = fetch_message() 
           msg_type = classify_message(msg) 
           result = parse_message(msg, msg_type) 
           deliver_message(result) 
     
       for i in range(20): 
           print() 
           ts = time.time() 
           flow.run() 
           print("total", time.time() - ts) 
     
     
    if __name__ == "__main__": 
        main()
      Execution time (absolute time, relative time per task):
    fetch   1607943883.0552974
    fetch   0.0001933574676513672
    classify 1607943883.3690984
    classify 0.00015401840209960938
    parse   1607943883.6593442
    parse   0.0009167194366455078
    deliver 1607943883.9757109
    deliver 0.0001609325408935547
    total 1.3962466716766357
    a
    m
    2 replies
    Copy to Clipboard
  • c

    Christian

    1 year ago
    Hi there. Question about the new
    RunGreatExpectationsValidation
    ... I always get a
    FAIL
    state returned from executing it and no artifacts page. Is it to be expected that only a successful validation produces the markdown artifacts page? I'm fighting with several aspects of my setup and it would be good to be able to inspect what the GE component is actually doing... Also, in general, I'd want to see the failed validation results... Thanks
    c
    j
    2 replies
    Copy to Clipboard
  • Adam

    Adam

    1 year ago
    Hi team, would like to upgrade my Prefect Cloud to the paid version. It says I should ‘Contact Us’. Can someone help me?
    Adam
    Dylan
    3 replies
    Copy to Clipboard
  • Marc Lipoff

    Marc Lipoff

    1 year ago
    I'm getting -- what I am sure is -- a silly error. I'm trying to run a simple flow on ECS. I get this. Any ideas about what the cause is?
    Marc Lipoff
    Kyle Moon-Wright
    3 replies
    Copy to Clipboard
  • Marc Lipoff

    Marc Lipoff

    1 year ago