• t

    Tsang Yong

    2 years ago
    Hi Community, can someone help me understand task map a bit better. If I have a list that got mutated in size during a flow run (like from 1 -> 2 dimensional), will map automatically break up the new mutated 2D list into tasks?
  • t

    Tsang Yong

    2 years ago
    so far in my attempts I can map out the original 1D list in my task run but not the 2D list.
  • t

    Tsang Yong

    2 years ago
    import IPython
    from prefect import task, Flow, Parameter
    
    
    my_list = [1, 3, 4]
    
    
    @task
    def map1_fn(item):
        new_item = [item+1, item+1]
        print("map1_fn: {}".format(item))
        return new_item
    
    
    @task
    def map2_fn(item):
        # expecting item = 2 but got [2, 2] etc
        print("map2_fn: {}".format(item))
        return item
    
    
    with Flow("workflow") as flow:
        my_other_list = map1_fn.map(item=my_list)
        my_another_list = map2_fn.map(item=my_other_list)
    
    flow.run()
    t
    Chris White
    8 replies
    Copy to Clipboard
  • r

    RyanB

    2 years ago
    It appears I can't use the context supplied logger when using the DaskExecutor as I get an exception that the context cannot be pickled?
    r
    j
    +2
    11 replies
    Copy to Clipboard
  • dhume

    dhume

    2 years ago
    I’m running into a problem when building my Docker storage object. I have my flows and some python files of shared functions. However I cannot
    add_flow
    any flow that relies on those shared functions. Is there a simple work around without turning the shared functionality into it’s own library and adding it to
    python_dependencies
    dhume
    1 replies
    Copy to Clipboard
  • t

    Tsang Yong

    2 years ago
    Is there a recommended way to build workflow code and organize local imports. how should I properly organize the workflow code / utils imports over a dask cluster? I’m new to python and I find myself having to build eggs + upload them while these are not my familiar territory.
    t
    Chris White
    3 replies
    Copy to Clipboard
  • Maikel Penz

    Maikel Penz

    2 years ago
    Hey.. question about
    task retries
    . Shouldn't the code below
    retry
    only every 30 seconds ? When I run it the output goes from 1 to 40 in less than a second and finishes. I expected it would take 30(seconds) x 40 runs to finish (as per my
    if loop_count == 40:
    to stop)
    from prefect import Flow, task, context
    from prefect.engine.signals import LOOP
    import datetime
    import time
    
    @task(max_retries=50, retry_delay=datetime.timedelta(seconds=30))
    def test_retry():
        loop_count = context.get("task_loop_count", {})
        print(loop_count)
    
        if loop_count == 40:
            return "finished"
    
        raise LOOP(message="Next run")
    
    with Flow("test-pipeline") as flow:
        test_retry()
    
    flow.run()
    Maikel Penz
    Chris White
    5 replies
    Copy to Clipboard
  • t

    Tsang Yong

    2 years ago
    Is there an example on how to catch bad shelltask in prefect? looking at the docs page it’s not clear to me how to catch these other a raise reference prefect.engine.signals.FAIL
    t
    Jeremiah
    4 replies
    Copy to Clipboard
  • t

    Tsang Yong

    2 years ago
    Hello. Have another question while checking on ifelse() flow. What is my best approach if I have a lot of downstream tasks based on a ifelse decision. Do I wrap all the downstream tasks with a ifelse()?
    t
    Chris White
    +2
    10 replies
    Copy to Clipboard
  • m

    Melanie Kwon

    2 years ago
    Hello, quick question: prefect’s dask scheduler does not seem to support the security specification that allows for things like ssl auth. Do you guys have any general recommendations in terms of executor authentication/security?
    m
    Zachary Hughes
    +1
    5 replies
    Copy to Clipboard