• s

    shazard

    2 years ago
    Hello there ! Would there be any reason explaining why a flow does not generate all mapped tasks it should ?
    @task
    def dummy_task(param):
        pass
    
    
    with Flow("test") as flow:
        dummy_task.map(list(range(100)))
    
    flow.run()
    flow.register()
    This code does run correctly locally, but gets stuck running the 64th task when manually ran from the Core Server UI. Since it always gets stuck on the same task I suppose this has to do with some resource or process limit but I can't seem to point exactly what it can be. Any idea ?
    s
    j
    5 replies
    Copy to Clipboard
  • j

    Jie Lou

    2 years ago
    Hi All. I have a question about retries on mapped tasks. A little background: we use prefect cloud with Dask workers to run multiple tasks in parallel. The chances are some of the workers could die when the flow is running, and we hope retries can help recover the task run. Here is a simple flow with a cal function:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x))
        time.sleep(x)
        return x
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal.map(time)
    When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
    starting sleep 60 seconds .    #then I killed the worker...
    Task 'cal[0]': Starting task run...
    Task 'cal[0]': task is already running.
    Task 'cal[0]': finished task run for task with final state: 'Running'
    1 mapped tasks submitted for execution.
    Task 'cal': Handling state change from Mapped to Mapped
    Task 'cal': task has been mapped; ending run.
    Task 'cal': finished task run for task with final state: 'Mapped'
    Flow run RUNNING: terminal tasks are incomplete.
    Marked "Failed" by a Zombie Killer process.
    It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie. However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x[0]))
        time.sleep(x[0])
        return x[0]
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal(time)
    Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
    starting sleep 60 seconds #then I killed the worker...
    Task 'time': Starting task run... 
    Task 'time': Handling state change from Pending to Running
    Task 'time': Calling task.run() method...
    Task 'time': Handling state change from Running to Success
    Task 'time': finished task run for task with final state: 'Success'
    Task 'cal': Starting task run...
    Task 'cal': Handling state change from Pending to Running
    Task 'cal': Calling task.run() method...
    starting sleep 60 seconds
    and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!
    j
    Jeremiah
    5 replies
    Copy to Clipboard
  • Jacob (he/him)

    Jacob (he/him)

    2 years ago
    Hello everyone 👋 I’m a data engineer tasked with choosing our new orchestration/monitoring tools and am interested in using Prefect with Redshift. Is this a design pattern that other Redshifters are using?1. api call to gather source data 2. schema test 3. write data to s3 4. copy command to redshift temp table (using psycopg2 connection) 5. append/insert/merge to staging table
    Jacob (he/him)
    a
    4 replies
    Copy to Clipboard
  • j

    John Ramirez

    2 years ago
    Hey is there a way to run the same flow is parallel with different parameters
    j
    Chris White
    +1
    4 replies
    Copy to Clipboard
  • m

    Mitchell Bregman

    2 years ago
    Hey there! I was able to setup the Prefect Local UI just fine on an EC2... However, when I register the flow, it does not show up in the UI and "Upcoming Runs" gets stuck loading... I was wondering if anyone else is experiencing this? It seems like the flow UUID gets picked up just fine on the backend local server and when I
    flow.register
    but nothing seems to be showing up
    m
    Chris White
    +4
    53 replies
    Copy to Clipboard
  • l

    Leo Meyerovich (Graphistry)

    2 years ago
    Is there a healthcheck route we can use for an
    agent
    /
    executor
    docker service? I saw a feb 5 PR around some storage healthchecks, but not seeing docs for instrumented monitoring here. Ideally something curl-able, like,
    healthcheck:
          test: ["CMD-SHELL", "curl -sSf <http://prefect/health> | jq .code | grep 200 || exit 1"]
    l
    Chris White
    29 replies
    Copy to Clipboard
  • l

    Leo Meyerovich (Graphistry)

    2 years ago
    I guess we can do something hackish like: -- docker host-level metrics as usual -- for prefect executor health, use unique per-executor labels and schedule healthcheck jobs on them, and have the docker healthcheck check for the output of those... quite hackish..
  • z

    Ziyao Wei

    2 years ago
    Is there an env var to change the server the UI points to?
  • z

    Ziyao Wei

    2 years ago
    Tried
    PREFECT_API_URL
    but doesn’t seem to work (seems obvious in retrospect)
    z
    Chris White
    10 replies
    Copy to Clipboard
  • l

    Leo Meyerovich (Graphistry)

    2 years ago
    We are close getting our initial orchestration pipeline ported 🙂 we're a bit confused on how to get long jobs running. tips appreciated. setup: -- server 'ui': running the ui container -- server 'gpu': running a prefect agent as well. registers with ui so it can pick up gpu jobs. -- server 'nb': jupyter notebooks we're using to submit jobs. has a local prefect agent installed that points to 'ui' so we can submit jobs. notebooks often die we can do quick one-offs fine. hurray! tricky case 1: long historic job we want to do a ~3 day job that processes 200 files, one at a time sequentially in sorted order. the problem is notebook server that runs the job will periodically stop, so we really want to submit a job like
    seq([ task_1(file_1), task_2(file_2), ... task_n(file_n)])
    . as soon as the meta-task is submitted, the notebook (and its local agent) can stop. however, for the next 3 days, we want those tasks to run one at a time, and we see status in the ui (incl. fails/retries). if we ever want to, we can rerun the flow to add/swap tasks.
    l
    Braun Reyes
    2 replies
    Copy to Clipboard