https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Martin T

    01/04/2022, 7:30 AM
    Hi all. I'm trying to set up a Prefect docker agent as a systemd service (CentOS). What is the recommended practice for storing the prefect API Key and passing it on to the systemd service?
    a
    • 2
    • 3
  • n

    Noah Holm

    01/04/2022, 9:52 AM
    With Orion, will subflows be mappable so we can run a dynamic amount of subflows based on some task result? Didn’t find any previous details on this
    a
    m
    • 3
    • 12
  • l

    Lucas Hosoya

    01/04/2022, 12:49 PM
    Hi, I found something interesting about Cancelling and Failing a run. Whenever I press Cancel in my Flow run, all tasks should be stopped and have the
    state.Cancelling
    and
    state.Cancelled
    as
    True
    , but none of them occurs. It only register as
    Cancelling
    or
    Cancelled
    when I put the flow to the State (UI) as Cancelled. Another weird fact is if I put the State as Cancelled, my tasks keeps running until it finishes (either in fail or succeed). I'm currently working on State handlers, how can I be sure that I'm cancelling my flow runs and consequently my tasks for that flow run?
    a
    • 2
    • 9
  • t

    Thomas Hoeck

    01/04/2022, 1:52 PM
    I have a problem where a Flow Run went from
    Scheduled
    to
    Cancelled
    without pressing cancel in the UI. By the looks of the following query the Flow Run was cancelled because a new version of the Flow was released (which archived the old flow). Is this expected behavior?
    a
    • 2
    • 6
  • r

    Rob Douglas

    01/04/2022, 2:00 PM
    Hi all, thanks in advance for any pointers. Does prefect have an API I could hit from Github? Specifically, I want a commit to our github repo to start a new run of a prefect flow. I was able to find Prefect webhooks for doing things after a flow runs, but I am trying to find a way to make a github webhook invoke a prefect flow run, instead. Any pointers?
    a
    • 2
    • 3
  • p

    Philip MacMenamin

    01/04/2022, 3:47 PM
    Hi, can I trust the order of mapped tasks? As in if I have a list
    l1 = [a, b, c]
    and I generate
    l2 = task_name.map(l1)
    , can I trust that element 0 in
    l2
    derives from element 0 from
    l1
    ?
    k
    • 2
    • 2
  • t

    Theo Platt

    01/04/2022, 4:22 PM
    We're running all our flows via ECSRun to kick off a Fargate task to run the flow. It all works fine but what I'm seeing are a lot of Fargate zombie tasks still running after the flow has finished. I need to investigate this a little more as it may be failed flows and/or flows we cancel via the Prefect console... not sure at this stage. Has anyone else had a similar issue? (a quick search of this channel didn't reveal much). Many thanks!
    k
    s
    • 3
    • 7
  • d

    Daniel Kornhauser

    01/04/2022, 5:30 PM
    Hi guys, below is question about “the prefect product” logging. I can’t slice and dice logs the way I am used to with the prefect-ui interface. So I wonder product wise, if prefect’s logging ui is a simple interface just meant to easily search the logs grouped by task or flow, and it should be used side by side with a more powerful logging solutions ? More powerful logging solutions such as elasticsearch+filebeat or even just saving them in a file and going over them with
    lnav
    k
    k
    • 3
    • 4
  • m

    Mathijs Miermans

    01/04/2022, 6:31 PM
    Is there a way to trigger a Prefect task after a dbt job finishes that's scheduled by dbt cloud? In the future we might manage dbt jobs using Prefect. For now we're looking for a solution / workaround where dbt jobs are scheduled by dbt. The obvious workaround is scheduling Prefect tasks for a time when we know dbt will be finished. Are there any alternatives?
    k
    a
    • 3
    • 13
  • f

    Frederick Thomas

    01/04/2022, 7:59 PM
    Hi All, We're considering moving our ETL processes from Prefect server to the hybrid model for a number of reasons. Currently, Prefect server is running in a Docker container on Azure, and we would like to move our current Flows to a Kubernetes cluster on Azure as well. Is there a definitive way this can be accomplished safely? Also, are there best practices documentation on handling failures or querying states from GraphQL? Thanks very much.
    k
    • 2
    • 8
  • p

    Philip MacMenamin

    01/05/2022, 12:16 AM
    I'm trying to write a state_handler which has access to results of a task, is there a way to do this via
    context
    maybe? I saw a reply from @Kevin Kho stating:
    @task
    def abc():
        prefect.context.my_var = here
    this is still the way to do this?
    k
    • 2
    • 23
  • f

    Florian Kühnlenz

    01/05/2022, 7:17 AM
    Hi there, is there a way to “unarchive” a flow. For example if I want to quickly roll back to an old version of a flow?
    k
    • 2
    • 5
  • m

    Mukamisha jocelyne

    01/05/2022, 10:50 AM
    Hi everyone, I'm a new prefect user and I am trying to use prefect to automate ml pipelines. I went through prefect website documentation tutorials but there's not so much content that goes in depth or touch on MLOps concepts. Can anyone point me to a good introductory resource where I can learn how to use prefect for machine learning models automation and deployment?
    a
    d
    • 3
    • 3
  • j

    Jason Noxon

    01/05/2022, 1:54 PM
    Hi, all! I work with @Frederick Thomas and we use prefect for our ETL stuff (and we love it!). My question is: is there any documentation on best practices for the development of flows? Any help would be greatly appreciated!
    a
    • 2
    • 2
  • a

    Alvaro Durán Tovar

    01/05/2022, 3:56 PM
    is there a max running time for a flow? and a max time at queued state?
    k
    • 2
    • 1
  • e

    E Li

    01/05/2022, 4:18 PM
    Hi all, I got this error returned from client.register() with prefect==0.15.9: expected one of the values edge_pkey for type "edge_constraint", but found "edge_flow_id_task_ids_key". what does it mean?
    k
    • 2
    • 42
  • h

    Hammad Ahmed

    01/05/2022, 4:26 PM
    Hi Everyone, we have a MLOps use-case where different jobs needs to be scheduled with different hardware resources so we have opted for flow of flow option as parent represent a single experiment we want to store/register artifacts using the parent id, is there any way to get parent flow id if you use create_flow_run
    k
    a
    • 3
    • 5
  • a

    An Hoang

    01/05/2022, 7:25 PM
    I'm encountering a problem where the run-time state DAG visualization doesn't match with the flow's creation-time DAG. You can see in the first screenshot that
    perform_permutation
    and
    generate_summary_df
    are both upstream dependencies of
    process_report_df
    . However, the flow fails at
    process_report_df
    without ever running the two upstream tasks. The error is due to not having data from upstream task
    generate_summary_df
    . The post-hoc state visualization doesn't show the two upstream tasks at all. Why does this happen? I will post the log and task results in the comments.
    k
    • 2
    • 12
  • t

    Thomas Fredriksen

    01/05/2022, 7:55 PM
    Hi everyone. I am currently debugging an issue where the healthcheck is failing. We are using a self-hosted Prefect Server, and are using the Docker storage backend to deploy flows to the server. The issue we are facing is that the healthcheck is failing with the following error:
    ModuleNotFoundError: No module named 'pipelines'
    The error seem quite clear, however there is no reference to any module called
    pipelines
    , but the directory that holds my flows has this name. Overall, my project structure looks like this:
    my_sdk
    |- __init__.py
    |- my_module.py
    |- cli
       |- __init__.py
       |- __main__.py
    pipelines
    |- __init__.py
    |- first_flow
       |- __init__.py
       |- flow.py
    |- second_flow
       |- __init__.py
       |- flow.py
    We have created a CLI-tool that handles deployment to our infrastructure, and works by dynamically importing the flow using
    importlib
    , then setting up the
    Docker
    storage before calling
    flow.register
    .
    # Import flow
    
    sys.path.append(os.path.abspath(pipeline_path))
    flow_module = importlib.import_module(os.path.join(pipeline_path, "flow").replace("/", "."))
    flow: Flow = getattr(flow_module, "flow")
    This worked really well until we started seeing the error above. Previously, we used to have a separate
    deploy.py
    -script for each flow that would build and register the flows. We would ocasionally see a similar error, saying that it could not find a module called
    flow
    . Simply copying the
    flow.py
    file to the built image by adding it to the
    Docker
    -storage init solved this:
    flow.storage = Docker(
    # ....
      files={
        path.join(FLOW_DIR, "flow.py"): "/flow.py"
      }
    )
    The strange part is that this error was not deterministic, and would only happen for one or two of our many flows. Since we only had to add the lines above to the deployment-file, we considered the problem as "solved for now". While working on the CLI, we once again encountered the same problem. We eventually solved it the same way as before, but this time had to copy
    flow.py
    to
    /pipelines/flow_name/flow.py
    in order to make it work, but once again - only a few flows were affected, not all of them. When debugging this issue, we managed to reduce the flow to a state where the problem simply dissapeared. We isolated a function that caused the
    ModuleNotFoundError
    , however it was not clear what part of the function that caused the error. Finally reducing the function to a stub, the error still persisted. The function only consisted of a single line
    pass
    , but were still causing the error. Removing the call to the offending function magically solved the problem, even though it was just a stub. Our conclusion is that the
    flow.py
    -file somehow is pickled when the Docker-image is built, but only in certain situations. We do not know why. Post-ramble summary - We observed that the file containing our flow-definition would be pickled with the flow itself when building the docker-image, but only in certain situations. Does anyone know what might be causing this behavior? We are happy with having to copy
    flow.py
    to the final docker-image, but would like to understand what is going on here.
    k
    k
    • 3
    • 11
  • t

    Tom Shaffner

    01/05/2022, 8:26 PM
    Is it possible to have a parameter the user could set in the UI to have a flow tell a task to ignore its own cache? e.g. most of the time when a task reruns inside the cached period I want it to use the cache, but every now and then I want it to rerun without it. Currently I just manually delete the cache files before kicking off a run to get this to happen; is there a better way? Ideally maybe a parameter I could pass in from the UI to tell it to ignore the cache this run?
    k
    a
    • 3
    • 26
  • l

    Louis Eisenberg

    01/05/2022, 9:42 PM
    I couldn't find this in the docs, but I'm hoping it's a simple question: is there a way to pass the name of the current run as an argument to a task? In a flow like the one below, I want to pass the name of the current run (e.g.
    electric-albatross
    ) to the
    app_sync_flow_claim_lock.R
    script. (The reason is that I want the claim operation to fail only if the lock is already held and the holder is not this run. So the claim script needs to know the name of the current run.)
    with Flow("App sync") as flow:
        app_claim_lock = ShellTask(
            command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
            task_args = dict(
                name = "app sync flow claim lock",
                max_retries=5,
                retry_delay=datetime.timedelta(minutes=10),
            )
        )
    k
    • 2
    • 6
  • a

    Aric Huang

    01/05/2022, 10:28 PM
    Hi, I'm seeing some strange behavior when using
    concurrent.futures.ProcessPoolExecutor
    in a Prefect task. With the following code as
    test.py
    , if run using
    python3 test.py
    it completes successfully, but with
    prefect run --path test.py
    it stays stuck at 0% completion.
    from prefect import task, Flow
    from tqdm import tqdm
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    def process(inp):
        return inp
    
    @task
    def test():
        print("start")
        with ProcessPoolExecutor(
            max_workers=4,
        ) as pool:
            futures = [pool.submit(process, inp) for inp in range(1000)]
            out = [f.result() for f in tqdm(as_completed(futures), total=1000)]
        print('done')
    
    with Flow("Test") as flow:
        test()
    
    if __name__ == "__main__":
        test.run()
    Any ideas what could be happening here?
    k
    • 2
    • 14
  • p

    Piyush Bassi

    01/06/2022, 3:24 AM
    #prefect-community Hello, Can anyone help with the step-by-step documentation for setting up the prefect tool with docker? As of now, I have installed docker-compose on the machine and am able to run prefect-server using HTTP://localhost:8080, what will be the next steps? How to set up agent and trigger flows so they can be reflected on the dashboard. Any help would be appreciated!
    k
    • 2
    • 6
  • t

    Thomas Hoeck

    01/06/2022, 8:23 AM
    Hi. I'm experiencing a bug/error when running a subflow using
    StartFlowRun
    . The subflow runs succesfully but in the parent flow I get the following error:
    a
    • 2
    • 12
  • m

    M. Siddiqui

    01/06/2022, 11:35 AM
    Hello everyone ! Hope you guys had a great start to the year 😄 I was trying to run a flow with over 250+ tasks on Prefect Cloud Free Plan. It was painfully slow since only 1 task was running at any given time. I know we can set task concurrency limits on the Standard Plan, but what is the limit on the Free plan ? I couldn't find any docs related to that.
    a
    • 2
    • 15
  • s

    Shivam Bhatia

    01/06/2022, 12:55 PM
    Hi, I am trying to run a flow on prefect cloud using a vertex agent and dask executor I registered my flow using the python. My flow environment is on dockerhub and I am using github storage Getting an error regarding name on prefect cloud. Any help would be appreciated.
    a
    • 2
    • 5
  • m

    Mike Lev

    01/06/2022, 1:56 PM
    Hey all have recently revisited running dependent flows... I have issues accessing a result of a task from a flow that another flow is dependent on... what am I doing incorrectly?
    flow_a
    @task(name='second_task_a', log_stdout=True)
    def second_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    
    @task(name='third_task_a', log_stdout=True, slug="last_task")
    def third_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    with Flow('flow_a') as flow_a:
        first_task_message = first_task()
        second_task_message = second_task(first_task_message)
        third_task_message = third_task(second_task_message)
    runtime= KubernetesRun DaskExecutor S3Results S3Storage using prefect cloud backend .... all flows are registered as expected including parent flow
    Untitled.py
    a
    • 2
    • 8
  • r

    rilshok

    01/06/2022, 2:41 PM
    Hello everyone, I have a question about scheduling flow launches. Let's say I expect that the execution of one run will take 2 hours. Using the UI. I set a schedule with a launch every two hours. What happens if for some reason the average execution time of my flow starts taking three hours? Let's say we have a plan: 1. Launch at 12:00 2. Launch at 14:00 3. Launch at 16:00 The launch (1) took three hours. Will the scheduled launch be completed at 14:00 (2)? If all scheduled launches are completed. Can I set up my flow so that startup (2) is skipped?
    k
    • 2
    • 4
  • a

    Amber Papillon

    01/06/2022, 2:46 PM
    Hey guys, do you know of any good available projects that I can take a look at that use FastAPI with Prefect (and dask scheduler behind it)? bonus for using mlflow as well
    a
    k
    • 3
    • 4
  • m

    Mahesh

    01/06/2022, 3:12 PM
    Hello Team, How to get flow_run_id in ShellTask?
    k
    • 2
    • 3
Powered by Linen
Title
m

Mahesh

01/06/2022, 3:12 PM
Hello Team, How to get flow_run_id in ShellTask?
am using below code,
from prefect import context
flowids=prefect.context.get("flow_run_id")
print (flowids)
test=ShellTask(
    shell="bash",
    return_all=True,
    log_stderr=True,
    log_stdout=True,
    stream_output=True,
    command="echo Hello {}".format(flowids)
    )
Flow is getting succeeded but am getting None in stdout instead of flow_run_id,
[2022-01-06 15:10:29+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Dimension1'
[2022-01-06 15:10:29+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2022-01-06 15:10:29+0000] INFO - prefect.ShellTask | Hello None
[2022-01-06 15:10:29+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Success'
[2022-01-06 15:10:29+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k

Kevin Kho

01/06/2022, 3:19 PM
Hey @Mahesh, see this thread which gives an explanation of that
👍 1
View count: 2