• Martha Edwards

    Martha Edwards

    6 months ago
    Hi! Is it possible in Prefect to create a sub-flow that works as follows: • task A runs and may fail. Retry A until success. • task B runs and may fail (or the task itself succeeds, but it returns a result indicating failure of the sub-flow). In that case, loop back to retry A. This seems like it could be possible to do using looping on a nested flow as described in this stackoverflow post.
    Martha Edwards
    Kevin Kho
    11 replies
    Copy to Clipboard
  • Martha Edwards

    Martha Edwards

    6 months ago
    Another question, is it possible to have an already-running Flow Run update when the flow itself gets updated? Suppose a task is added, or modified, or removed? Or to cancel a Flow Run and then have it restart from cancelled, but the new run would include the recent updates to the flow?
    Martha Edwards
    Kevin Kho
    6 replies
    Copy to Clipboard
  • r

    Roger Webb

    6 months ago
    Im setting up an automation for "When any run from <MyFlowName> changes to Failed, then <<My Action>>". But when I save the automation... it changes to "When any run from <MyFlowName> changes to any of these states, then <<My Action>>". is this an issue with the GUI.. or is my automation not saving the correct values in the wizard?
    r
    Jenny
    3 replies
    Copy to Clipboard
  • Moises Vera

    Moises Vera

    6 months ago
    I'm experiencing a weird issue when I register my flows • I had only one flow with this dir structure ◦
    extract_data.py
    flow file (config for the flow) ▪︎ here I import the tasks directory with a simple
    import tasks
    tasks
    at the same level, a directory with my tasks ◦ When I register this flow it works correctly in schedule basis • I just added a new flow
    calculate_something.py
    • The tasks for this flow are in the
    tasks
    directory too • now when I want to run this new flow I get
    FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'tasks\'")
    What I don't get is... Why is it working for the first flow and not for this new one? Any ideas? I appreciate it
    Moises Vera
    Kevin Kho
    +1
    6 replies
    Copy to Clipboard
  • Suresh R

    Suresh R

    6 months ago
    Hi, I have a flow like this A -> List -> Dict -> B, Result of A is persisted in S3, Whenever B fails and i try to restart the flow Dict task supplies empty data to B since its result is not persisted. Dict is an internal task which we don’t have any control, How we can overcome this?
    Suresh R
    Sylvain Hazard
    5 replies
    Copy to Clipboard
  • Toby Rahloff

    Toby Rahloff

    6 months ago
    Hi, we are currently evaluation Prefect Orion (and we are pretty amazed to be honest). One challenge I cannot wrap my had around at the moment is manual interaction (e.g., "wait for approval"). We are crunching large visual data sets that often have unexpected features. Because of this, we need some kind of manual sanity check by a human expert before we start the compute/time intensive tasks. In Prefect 1.0 this was possible via the
    manual_only
    trigger. Is it possible to do the same with Orion (in the future)?
    Toby Rahloff
    Anna Geller
    4 replies
    Copy to Clipboard
  • Bruno Nunes

    Bruno Nunes

    6 months ago
    Hello, I would like to upgrade my prefect server in my k8s cluster to prefect orion. Is there any guidelines on how to do this?
    Bruno Nunes
    Anna Geller
    +1
    5 replies
    Copy to Clipboard
  • t

    Tomer Cagan

    6 months ago
    Hello, I am a bit confused about how task results are stored when I use kubernetes. Can I define a mount to network storage (in the job template) and then use local storage so it is saved there? Would server be able to read it (assuming is also mounted on the same network storage)? Alternatively, we have sonatype nexus repository - can I create a result type based on that (implemented the interface?) If I do so, how can I ensure this code is available to the system (task / server)
    t
    Anna Geller
    5 replies
    Copy to Clipboard
  • Anna Geller

    Anna Geller

    6 months ago
    FYI: if you face some issues with Slack today, you can post your question on Discourse or GitHub. 😒lack: status is all green but e.g. search doesn't seem to work today.
    Anna Geller
    1 replies
    Copy to Clipboard
  • Vadym Dytyniak

    Vadym Dytyniak

    6 months ago
    Hi. I am trying to add checkpointing in my flows to checkpoint Dask dataframe that I pass between tasks: Tasks:
    @task(result=S3Result(bucket='bucket-name'))
    def checkpoint_data() -> dd.DataFrame:
        df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
        ddf = dd.from_pandas(df, npartitions=1)
    
        return ddf
    
    
    @task()
    def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
        raise ValueError("Checkpoint testing...")
    Flow:
    ddf = checkpoint_data()
    accept_checkpointed_data(ddf)
    How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs? Thanks
    Vadym Dytyniak
    Anna Geller
    +1
    33 replies
    Copy to Clipboard