• s

    Sultan Orazbayev

    8 months ago
    Hello, if anyone is using prefect on a SLURM cluster, I am interested in connecting to learn about the experience.
    s
    Kevin Kho
    +2
    8 replies
    Copy to Clipboard
  • Noam Gal

    Noam Gal

    8 months ago
    Hi all, I'm a newbie to prefect. I have created a flow that uses to
    prefect.Parameter
    Tasks. The type of the parameters are just native
    str
    and
    int
    The flow's logic uses some other tasks that are using the parameter tasks. Those tasks are using some other helper functions that help me reuse code and make the code more readable. f I want that my helper function to use one of the parameter (I just need the value, not the task itself) I need to set it as a prefect task by itself and when calling it from other task it should be called with
    .run
    since inside the task it isn't in the context of a flow. For example:
    import prefect
    from prefect import Parameter, Flow, task
    with Flow("my flow") as my_flow:
    id = Parameter("id", required=True)  # int value
    description = Parameter("description", required=True). # str value
    result1 = my_task1(id, description)
    result2 = my_task2(id, description)
    my_reduce_task(result1, result2)
    @task
    def my_task1(id, description):
    val1 = calc_logic_func1()
    return shared_task.run(id, val1)
    @task
    def my_task2(id, description):
    val2 = calc_logic_func2()
    return shared_task.run(id, val2)
    @task
    def shared_task(id: int, value:int) -> int:
    return ...
    def calc_logic_func1() -> int:
    return ...
    In the example above I want to use a helper function
    shared_task
    with the integer
    id
    value but since
    id
    is a
    prefect Parameter Task
    , therefore shared_task itself must be a task and calling it from other task (e.g.
    my_task1
    it should be called with
    shared_task.run
    . Well, this is how I understand this so far. Is there any other way to use it? (not setting
    shared_task
    as a task OR not calling it with
    .run
    since
    my_task1
    is already called from my_flowcontext) If this is the right way to use it - are there any other effects on the flow run (I guess
    my_task1
    will execute
    shared_task
    itself in the same agent) Thanks!
    Noam Gal
    Anna Geller
    3 replies
    Copy to Clipboard
  • f

    Florian Kühnlenz

    8 months ago
    Hi, we just had a two flows becoming stuck without any apparent reason. All tasks had been finished but the flow would remain as running, therefore blocking others. Any idea how to debug what was going on? Manually setting the state resolved the problem.
    f
    Anna Geller
    15 replies
    Copy to Clipboard
  • Tom Klein

    Tom Klein

    8 months ago
    Hey! 🙋‍♂️ We’re trying to use this (excellent) example:https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py And we’re missing some permissions on our end for K8s operations - however, I noticed that when i ran this flow --- even though the first step (delete k8s job) failed, it proceeded to perform the next steps (e.g. create job, which it does have permissions for) am i missing something about how this should work? shouldn’t a failure in a task lead to halting the entire flow (by default, without explicitly playing with triggers)?
    Tom Klein
    Anna Geller
    4 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    8 months ago
    Hi folks, question about the lazarus process. Why would lazarus try to reschedule a flow run if it reaches a successful state ?
    m
    Anna Geller
    10 replies
    Copy to Clipboard
  • Bruno Murino

    Bruno Murino

    8 months ago
    Hi everyone — I’m trying to pass tags to the ECS Run task, but it doesn’t look like the tags are being propagated. Is this the right way to pass tags to the ECS tasks that the Prefect Agent will create?
    Bruno Murino
    Anna Geller
    6 replies
    Copy to Clipboard
  • Miguel Angel

    Miguel Angel

    8 months ago
    Hello everyone! Does anyone have worked with dask futureswithin prefect flows context? I've basically want to perfom some future computations in order to parallelized parquet reading and dataframe concatenation. The following snippet shows a MWE using dask futures and client.
    import dask.dataframe as dd
    from dask.distributed import Client
    from s3fs import S3FileSystem
    
    s3 = S3FileSystem()
    client = Client()
    folder_list = [
        "file1",
        "file2",
        "file3",
        "file4",
        "file5",
        "file6",
        "file7",
        "file8",
    ]
    file_list = list(
        map(lambda folder: f"<s3://my-bucket/parquet/{folder}/*.parquet>", folder_list,)
    )
    dataframe_list = client.map(dd.read_parquet, file_list, gather_statistics=False)
    
    dataframe = client.submit(dd.concat, dataframe_list)
    
    mean_value = client.submit(lambda x: ["some_data_column"].mean(), dataframe)
    
    mean_compute = client.submit(lambda x: x.compute(), mean_value)
    
    print(mean_compute.result())
    Miguel Angel
    Anna Geller
    +1
    9 replies
    Copy to Clipboard
  • a

    Andreas Eisenbarth

    8 months ago
    Hello! I have encountered a very weird behavior and have no more ideas what could cause it. We do batch processing and use
    create_flow_run
    with
    map
    to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same
    flow_run_id
    , which means they overwrite their logs and we only see one in Prefect UI. (Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup
    create_flow_run
    was working correctly previously.) Does anyone have ideas? (Example code attached)
    a
    Kevin Kho
    +1
    6 replies
    Copy to Clipboard
  • m

    Matt Alhonte

    8 months ago
    This rules so hard. Wanna find a way to include it in my Prefect pipelines. https://github.com/stepchowfun/typical
    m
    Anna Geller
    2 replies
    Copy to Clipboard
  • Samay Kapadia

    Samay Kapadia

    8 months ago
    I’m running into the weirdest error. Trying to make prefect cloud work with my kubernetes cluster. The error says
    No module named '/Users/sa/'
    . Why does it want my home directory to be a module? More details inside
    Samay Kapadia
    Kevin Kho
    5 replies
    Copy to Clipboard