• t

    Tenzin Choedak

    1 year ago
    Hi prefect community and prefect team 👋 Is there any desire for a builtin prefect task for kafka flows? At my org, within the data engineering team we hardly ever run live producers and consumers - those are typically maintained and developed by the application team who need that sub-second responsiveness to events. However, we do own batch consumers and producers - either to pickup a batch of new events from the application stack or to push a batch of updates to the application stack. It seems possible to at least design a batch consume + batch produce task for a kafka integration within prefect. How do others and the prefect team feel about that?
    t
    Kyle Moon-Wright
    6 replies
    Copy to Clipboard
  • Newskooler

    Newskooler

    1 year ago
    Newskooler
    1 replies
    Copy to Clipboard
  • h

    Hui Zheng

    1 year ago
    hello, I changed the name of one my task from
    get_timestamp()
    to
    init_flow_run()
    . It runs fine with docker storage locally, but after I pushed the new flow to the prefect-cloud, I encountered this error
    Unexpected error while running flow: KeyError('Task slug init_flow_run-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
    h
    Kyle Moon-Wright
    6 replies
    Copy to Clipboard
  • b

    Bridget Pelletier-Ross

    1 year ago
    Hi all! 👋 can anyone here tell me if there’s a security review of prefect I can look at? I’m considering using core in production at my company. Thank you!
    b
    1 replies
    Copy to Clipboard
  • g

    gunarevuri

    1 year ago
    Hi guys, i was new to prefect, i ran a very basic code flow, and then i want to see it in UI, but i am getting error like this, "ValueError: Project my_first_flow not found. Run
    client.create_project("my_first_flow")
    to create it." And here is my basic code
    from prefect import task, Flow
    @task def hello_world(): return "hello prefect" @task def prefect_say(s😒tr): print(s) with Flow("my_first_flow") as f: r = hello_world() s = prefect_say(r)

    f.run()

    f.visualize() f.register("my_first_flow")
    g
    1 replies
    Copy to Clipboard
  • Alfie

    Alfie

    1 year ago
    Hi team, i do not see delete flow in webhook storage, does that mean I need to manage the deletion by myself? Thanks
    Alfie
    Jenny
    5 replies
    Copy to Clipboard
  • Darren Forsdike

    Darren Forsdike

    1 year ago
    Hi, Im new to prefect , looking for a scheduler to run various docker images in a private registry at scheduled times and some other tasks in the future , is that something that is pretty easy to configure ?
    Darren Forsdike
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • j

    Jacques

    1 year ago
    We used to use
    LocalDaskExecutor(scheduler="threads")
    for running one-off flows, then started having issues with tasks being repeated and was recommended we switch to
    DaskExecutor
    , which we've been using with
    DaskExecutor(cluster_kwargs = {"processes": False})
    . We've now run into some new bugs with latest prefect (a side note, but here: https://github.com/PrefectHQ/prefect/issues/3443) - trying to understand what the difference is between
    LocalDaskExecutor
    and
    DaskExecutor
    using threads and a temporary cluster? Is there a performance/reliability advantage of using one over the other?
    j
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • b

    Benjamin Filippi

    1 year ago
    Hi guys, there a seemingly simple thing I can’t manage to do...... I want to cascade mappings: I have two lists: L1=[A,B] L2=[1,2,3] I want to generate 6 tasks, expected execution plan is as follow: Execute (A,1) (A,2) (A,3) (B,1)(B,2)(B,3) Reduce A from first 3 reduce B from last 3 Reduce (A,B) What simple trick am I missing?
    b
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • m

    Mitchell Bregman

    1 year ago
    Hi there, We are using Prefect Cloud to handle our workflow management. In the process of standing it up, our deployment recipe uses a Kubernetes Agent and Docker Storage. When registering a flow to cloud, without specifying an environment, the flow is submitted for execution, and runs entirely fine. However, for long running map tasks, we’d like to consider using
    DaskKubernetesEnvironment
    . Perhaps, we can use
    DaskKubernetesEnvironment
    for all of our flows. Upon registering a new flow, as shown below, to the cloud and submitting a Quick Run, we get the error: 
    Kubernetes Error: Back-off pulling image
    When not specifying the
    DaskKubernetesEnvironment
    all registering, deploying, flow execution works just fine. Here is a sample flow that I am trying to use Dask for:
    with Flow("test-flow") as flow:
      numbers = numbers_task()
      first_map = map_task.map(numbers)
      second_map = map_task.map(first_map)
      reduction = reduce_task(second_map)
    
    flow.storage = Docker(
      registry_url="<http://parkmobile-docker.jfrog.io|parkmobile-docker.jfrog.io>",
      image_name="test-flow",
      image_tag="0.0.1"
    )
    
    flow.environment = DaskKubernetesEnvironment(min_workers=2, max_workers=4)
    
    flow.register("test")
    Any ideas as to why the DaskKubernetesEnvironment is throwing off the flow execution?
    m
    Chris White
    11 replies
    Copy to Clipboard