• j

    Jason Noxon

    8 months ago
    Hello, Community! I installed the Prefect App from the Azure Marketplace, connected it to Prefect Cloud - no issues. I cannot, however, connect prefect to the cluster because there is no external IP! Well, I think that's the reason - not sure. Any docs anywhere about this?
    j
    Kevin Kho
    9 replies
    Copy to Clipboard
  • g

    Gui Pires

    8 months ago
    Hi all. I'm new to Prefect. Trying to use it to process a large dataset of 3d images. It works nicely for a small subset of the dataset, but when I try it on the whole dataset I find that "Unmanaged memory" (as per dask's dashboard) accumulates in the dask-workers and they end up OOM'ing. In broad terms, the flow does the following: • read in each image and apply a couple of simple transformations (cropping, aligning, ...) • compute some global stats across the whole dataset • use the global stats as parameters to further process the images so "map - reduce - map" I'm trying to avoid IO ops (and maybe I just shouldn't), so I do expect dask's memory to increase as it stores intermediate results, but I would assume that to be "Managed memory". • Could it be that this is just Prefect storing intermediate results? And dask doesn't "know" about them? • Is it possible I'm messing something up in terms of how I'm programming the workflow, leading to unnecessary serialization of large objects? (I do get some messages about this). I tried to use prefect's constructs everywhere to avoid messing this up. ◦ e.g.: a prefect task reads a file with a list of paths, a read-file task is mapped to this list, and so on. so I don't see where I could be passing large objects to be serialized • I also tried manually garbage collecting inside the prefect task where memory seems to be accumulating any ideas on how to further debug this? thanks in advance!!
    g
    Kevin Kho
    14 replies
    Copy to Clipboard
  • i

    Isaac Brodsky

    8 months ago
    I'm getting some odd error about a Dask future my job awaits being attached to a different loop:
    RuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
    As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in
    with worker_client()
    i
    Kevin Kho
    16 replies
    Copy to Clipboard
  • Danny Vilela

    Danny Vilela

    8 months ago
    Hi all — I have a dynamic task
    Foo
    that maps over a task input (
    list
    ) with 3 values. Sometimes, though, some of those values aren’t applicable (for example, if it’s a requested date that doesn’t exist in our database), so I thought of raising
    prefect.engine.signals.SKIP
    to note that (1) this dynamic task input doesn’t apply but also that (2) it wasn’t a failure, per se. That said, I’m noticing that a task
    Bar
    that is directly downstream of
    Foo
    is also skipping because when
    Foo
    skips any of its mapped tasks. It seems this is intended, but is there a trigger I should raise to note that “it’s fine if any number of these mapped tasks fails”?
    Bar
    has other upstream tasks but I wouldn’t want those to be considered. Does
    skip_on_upstream_skip
    apply here? Should I configure
    Bar
    such that
    skip_on_upstream_skip=False
    ? From the docs here.
    Danny Vilela
    John Jacoby
    +1
    4 replies
    Copy to Clipboard
  • John Jacoby

    John Jacoby

    8 months ago
    Hi all. I'm struggling to get my task to properly return multiple values. I specify 'nout=2' in the decorator but I still get the 'Task is not iterable' TypeError. This is the fairly simple flow:
    with Flow(constants['name'], result=result) as flow:
    John Jacoby
    Kevin Kho
    +1
    8 replies
    Copy to Clipboard
  • John Jacoby

    John Jacoby

    8 months ago
    participant_ids, scan_ids, visit_dates = get_study_data() bourget_path, BIDS_path = bourget2bids.map(participant_ids, scan_ids, unmapped(constants)) copy_DICOMS.map(scan_ids, bourget_path, BIDS_path, unmapped(constants))
  • John Jacoby

    John Jacoby

    8 months ago
    And the also fairly simple task:
    @task(target='{task_name}/{scan_id}', checkpoint=True, nout=2)
    def bourget2bids(participant_id: str, scan_id: str, study_constants): tmp_BIDS_dir, bourget_path = get_from_bourget(participant_id, scan_id, study_constants['name'], study_constants['scripts_dir']) add_ASL_metadata(scan_id, study_constants['name']) BIDS_path = copy_to_study_data_folder(participant_id, scan_id, tmp_BIDS_dir, study_constants['name'], study_constants['data_dir']) return bourget_path, BIDS_path
  • John Jacoby

    John Jacoby

    8 months ago
    Sorry, I couldn't seem to get that to format properly
  • John Jacoby

    John Jacoby

    8 months ago
    Thanks for any insight anyone can provide!
  • John Jacoby

    John Jacoby

    8 months ago
    The error gets thrown at the 'bourget2bids' line