https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • g

    Gabriel Milan

    11/18/2021, 1:47 PM
    Hello again! When using server, as I use
    register
    , all my new or modified flows get registered fine. On the other side, some flows I've deleted are still registered on my server. Is there a way of syncing these?
    a
    • 2
    • 7
  • d

    Dekel R

    11/18/2021, 2:18 PM
    Hey again, I want to run my flow on schedule, So I’m using this guide - https://docs.prefect.io/core/tutorial/05-running-on-a-schedule.html It will run every day and will get the relevant data from an API (each time for the last day). From time to time I would like to run it manually and give it parameters at the UI level (the motivation is getting data for an interval of dates and not just a single one - My DS team may ask for data of a full year). Is there any way to do both stuff with exactly the same code? Something like “if env_vars is None: run_with_sched else run_with_env_vars” Thanks
    k
    • 2
    • 3
  • s

    Sash Stasyk

    11/18/2021, 3:34 PM
    Hey, not sure if this the right place to ask, can anyone provide an idiomatic example of using Oracle DB connection across multiple tasks please? I am helping out a colleague with an ETL flow which requires multiple queries done across multiple connections, which would preferably persist throughout the flow. The initial problem we are encountering is that you cannot pass the connection object between tasks as it needs to be serialisable.
    a
    s
    • 3
    • 19
  • a

    Aleksandr Liadov

    11/18/2021, 5:37 PM
    Hello guys, I try to understand how I can pass the result of one flow to another one? Ex. The first flow should generate the meta-information about the requested video. The second flow should use this information to do some business logic. How I can do it in prefectly way?
    k
    • 2
    • 1
  • l

    lucas oliveira

    11/18/2021, 6:44 PM
    Hello Guys, I'm configuring Prefect with ECS agent. I'm using Prefect Cloud. I registered a flow and when I go to test it, it always has a pending status. I'm using AWS ECR as Storage. When I access the Dashboard to follow the flow and see the logs, I get:
    Submitted for execution: Task arn:aws:ecs:us-east-2:<my-account>:task/my-fargate-cluster/1eb7b9cbd17c448ab4fcdc76e4692297
    Has anyone experienced a similar problem and can you help me? Thanks!
    k
    a
    • 3
    • 11
  • a

    Aqib Fayyaz

    11/18/2021, 7:13 PM
    did someone try module as storage for prefect flows?
    k
    • 2
    • 4
  • v

    Vipul

    11/18/2021, 7:15 PM
    Just wondering if anyone has seen this in Prefect Server, we accidentally trigger the flow for the wrong date and when we try to cancel the flow from the UI it didn't work. The task continue to run under the Dask and we were not able to stop that task until we restart the Dask. Would appreciate if anyone know how to fix this without restarting the Dask?
    k
    • 2
    • 2
  • v

    Vipul

    11/18/2021, 8:22 PM
    Quick question, we usually achieve caching by using the checkpoint i.e. result are stored on the file. At times one of our task might fail and it might be possibly due to one of the upstream task had some issue and wanted to know if there is a way to remove the cache from the UI rather than going to backend and remove the file manually.
    k
    • 2
    • 4
  • k

    kiran

    11/18/2021, 8:39 PM
    Do folks have preferences/advice for using
    nohup
    instead of Supervisor? I’m thinking something simple like this
    nohup prefect agent local start 2>&1 > /tmp/prefect_local_agent.log &
  • j

    John T

    11/18/2021, 9:00 PM
    Can prefect cloud support PickleSerializer on
    PrefectResult
    ? I’m currently encountering this error:
    TypeError: PrefectResult only supports JSONSerializer or DateTimeSerializer
    k
    m
    • 3
    • 16
  • k

    Kevin Kho

    11/18/2021, 10:13 PM
    Join us for the Prefect Happy Hour!
    🚀 4
    :marvin: 3
  • j

    John Muehlhausen

    11/18/2021, 11:08 PM
    How do I tell Prefect to run exactly one copy of a Flow, starting immediately, until the Flow ends (variable amount of time), then immediately start another single copy, and so forth? ... by "copy" I just mean I wouldn't want the Flow runs to overlap
    k
    m
    a
    • 4
    • 24
  • h

    Hugo Shi

    11/18/2021, 11:43 PM
    any tips on what to do when an agent gets "stuck?" I have one agent that is responsible for kicking off flows every minute, and it often ends up where the agent is querying for flows, and there are submittable runs, but they never start.
    a
    • 2
    • 6
  • d

    Dominic Pham

    11/19/2021, 12:15 AM
    Hi all, given that during flow_run, task_A will return a list from a sql query downstream to task_B. task_B will then start at index[0] and send that value downstream to task_C. Now, x minutes later, flow_run will run again, and task_A will return the same list, but task_B will now return index[1] instead of index[0] and send the value of index[1] to task_C and so on and so forth. What would be the best approach for this, where I want a given flow, between scheduled runs, to remember its location in an iterable?
    m
    k
    • 3
    • 6
  • j

    Jeremiah Lethoba

    11/19/2021, 8:03 AM
    Hello there, to avoid wasting time and if anyone has tried it, is it possible to customize the prefect cloud UI or the beta feature (artifacts) to add things like progress bars or plots from tasks run on the UI...anyone ?
    a
    • 2
    • 2
  • a

    AJ

    11/19/2021, 8:50 AM
    Hi, What is prefect orion and how is it different from prefect core and prefect server. How will prefect cloud get affected by this? Can we deploy jobs to prefect cloud from orion like we are able to do from core? Thanks.
    a
    • 2
    • 1
  • a

    Adam Everington

    11/19/2021, 10:19 AM
    Flow registration within the script using `flow.register()`: If I use
    flow.register(project_name='my-project', indempotency_key=flow.serialized_hash())
    on prefect server will the version number still get bumped each time?
    a
    • 2
    • 1
  • f

    Florian Kühnlenz

    11/19/2021, 11:42 AM
    When running prefect docker agents it seems that the a running docker container does not automatically inherit the context from the agent. Is that intended behavior? How best to set the context inside the docker container?
    a
    • 2
    • 14
  • a

    Aqib Fayyaz

    11/19/2021, 12:18 PM
    Prefect agent that is deployed on gke cluster when deploying server using helm chart and the prefect agent that we deploy on kuberntes using this commnad
    prefect agent kubernetes install -k API_KEY | kubectl apply --namespace=my-namespace -f -
    are they both the same, like i have one deployed with prefect server on gke but i need to use prefect cloud so is one agent enough for both server and cloud or i have to deploy seperate one for cloud?
    a
    • 2
    • 2
  • e

    ek

    11/19/2021, 3:26 PM
    I’m wondering on how to remove completed prefect jobs on k8s?
    k
    • 2
    • 2
  • l

    Leon Kozlowski

    11/19/2021, 6:03 PM
    Hi all - I am wondering what the best way is to determine what agent a flow is running on while its in progress
    a
    k
    • 3
    • 11
  • m

    Max Kolasinski

    11/19/2021, 9:31 PM
    I’m having some trouble wrapping my head around designing a conditional flow based around Great Expectations Validation and was wondering if any one had any advice. To summarize- we have an ETL Task, after which we want to run the Validation Task. If the Validation Task sets its state as Failed we want to run a Notification Task that alerts us, but otherwise we want the subsequent tasks to continue. Where I’m getting stuck is that it seems like the routing tools that I’ve found don’t quite fit right: • Using Triggers doesn’t feel correct, because I don’t want to run my Notification Task on
    all_failed
    or
    any_failed
    - if the ETL Task fails, it shouldn’t run. What I believe I would need is something like an
    on_x_task_failed
    option- it seems like the available options are way too broad to be useful. • I then looked into some of the ideas on the Conditional Logic page, but this seems clumsy for a few reasons. I need a Task specifically to check the State of the Validation Task, and then on our Schematic View we have additional Tasks showing up for each Case Task as well as the Merge Task. All combined, it makes our Schematic look like the image below which seems crazy for what is effectively
    if x do y
    . I feel like I have to be approaching this in completely the wrong way- if anyone had any ideas or suggestions I would be extremely grateful.
    k
    a
    • 3
    • 6
  • t

    Tom Shaffner

    11/19/2021, 11:16 PM
    Theory question: Is it possible to have a (non-task) abstract class in which the methods are tasks? I have a flow structure that gets repeated frequently for different data sets, I'm trying to create an abstract class in which the repetitive tasks are methods with the necessary text inputs, and then the 1-2 non-repetitive methods are abstract methods in the parent class and are thus implemented in child classes. The problem though is that when I do this, the @task decorators are messing with calling the methods internally. I have to pass in "self" manually, and if I need to specify any non-data dependencies (e.g. add an upstream_tasks input for the decorator) the methods seem to break whether "self" goes before or after the decorator input. Is there a way to do this? Or will this approach just not work? Really hoping there is a way since this will enable me to MASSIVELY reduce my code replication.
    k
    • 2
    • 2
  • a

    André Bonatto

    11/20/2021, 12:50 PM
    Hi, I have a list of flows that are very similar and I wanted to create a factory function in order to make things easier to maintain. The code looks very similar to this:
    from prefect import Flow, Parameter, task
    from etl import prepare, load
    from crawlers import crawler1, crawler2
    from typing import Dict, Callable, String
    prepare = task(prepare)
    load = task(load)
    def make_standard_pipeline(flow_name: String, func :Callable, func_params:Dict):
        
    with Flow(flow_name) as flow:
            
    params = {k: Parameter(k, default = v) for k,v in func_params.items()}
            
    df = func(**params)
            
    df = prepare(df)
            
    df = load(df)
        
    return flow
    pipe1 = make_standard_pipeline('flow1', task(crawler1), {})
    pipe2 = make_standard_pipeline('flow2', task(crawler2), {'type' : 'xxx'})
    Locally this code runs fine and I can also register these on the prefect server. However, when I try to run the flows, only the first flow defined in the file runs successfully (I tested reordering the flows). For the other flows, I get Key Error saying it couldn't found task slug crawler2. Does anyone has hints on what could be causing this problem? Thank you.
    a
    • 2
    • 2
  • c

    Chen Di

    11/20/2021, 3:30 PM
    Hi,I'm new to Prefect, but I'm using airflow for a while, we got an issue in airflow, so I wonder if Prefect could handle it better than airflow. We tried to run 40,000 tasks (very simple tasks, just print commands, with 1000 dags, 10 dag runs per dag, 4 task per dag run. ) at same time, we found airflow took a very long time to finish them (about 1 hours) , our server resource is good enough, it looks like scheduler is the bottleneck, so I wonder would Prefect be better than airflow to handle many tasks at same time. What's the max amount of task run at the same time you have seen? How long Prefect takes to finish them? Thanks
    a
    • 2
    • 1
  • m

    Manuel Gomes

    11/21/2021, 5:40 PM
    Hi everyone, first question, please be gentle 🙂 I am sending a parameter called video to my "Process video" workflow. The first task in this workflow is to upload the video to an s3 bucket. The upload task is transcribed in the thread: My main problem seems to be that s3 requires a string filename, but my video is of the type <Parameter: video>, as illustrated by the error message
    ValueError: Filename must be a string
    So there is clearly some sort of.. unwrapping/unpacking that I'm missing? my flow is likewise in the thread, as is the invocation. So... would someone please tell me in which exact angle I should smack my forehead and the correct octave of the "duh!"... plus maybe what I should be doing instead?
    j
    k
    • 3
    • 10
  • w

    Wilhelm Su

    11/21/2021, 8:54 PM
    Hello everyone! I'm new here. I'm handling a new DE role, and after trying the others (Airflow, Luigi, Dagster) I chose prefect for being that weird characteristic of being the most fun to use. It's absolutely amazing. Would anyone know how to capture the exact full python traceback when a task fails? I've set up state_handlers but the State.message only captures an abbreviated form of the error (e.g. ZeroDivisionError) . Thanks!
    👋 4
    k
    a
    • 3
    • 7
  • a

    Anh Nguyen

    11/22/2021, 10:21 AM
    Hi all, i wanna create a api, that use to call to execute a task in prefect. How to do that? thanks all
    a
    • 2
    • 13
  • j

    Jean-Baptiste Six

    11/22/2021, 2:05 PM
    Hey 🙂 I'm trying to send a notification on Slack in case of task failure, but unfortunately I don't receive anything 😕 • First, I followed the instructions on : https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#customizing-your-alerts with
    slack_notifier
    , without success. • Then I tried with
    SlackTask
    , this is my piece of code (didn't work) :
    def post_to_slack_on_failure(task, old_state, new_state):
        if new_state.is_failed():
            if isinstance(new_state.result, Exception):
                value = "```{}```".format(repr(new_state.result))
            else:
                value = str(new_state.message)
            msg = (
                f"The task `{prefect.context.task_name}` failed "
                f"in a flow run {prefect.context.flow_run_id} "
                f"with an exception {value}"
            )
            SlackTask(
                message=msg,
                webhook_secret="<https://hooks.slack.com/services/*******/*******/*******>"
            ).run()
        return new_state
    • Finnally, I tried with a custom solution (didn't work, again) :
    def post_to_slack_on_failure(task, old_state, new_state):
        if new_state.is_finished():
            msg = "Task {0} finished in state {1}".format(task, new_state)
            # replace with your Slack webhook URL secret name
            secret_slack = cast(str, Secret("<https://hooks.slack.com/services/*******/*******/*******>").get())
            <http://requests.post|requests.post>(secret_slack, json={"text": msg})
        return new_state
    This is my test flow :
    @task
    def task_error():
        raise Exception("Test")
    
    with Flow("Test Slack", state_handlers=[post_to_slack_on_failure]) as flow:
            task_error()
    
        flow.run()
    (I also tried EmailTask with smtp_type="INSECURE", and once again it didn't work) Need some help please 🙏
    a
    k
    • 3
    • 9
  • m

    Margaret Walter

    11/22/2021, 4:18 PM
    Hey guys! Is there a way to hand in an endpoint_url to Prefect when using s3 storage? We use a containerized localstack instance for local testing/dev in place of s3.
    a
    • 2
    • 1
Powered by Linen
Title
m

Margaret Walter

11/22/2021, 4:18 PM
Hey guys! Is there a way to hand in an endpoint_url to Prefect when using s3 storage? We use a containerized localstack instance for local testing/dev in place of s3.
a

Anna Geller

11/22/2021, 4:21 PM
@Margaret Walter perhaps Webhook storage is something for you if you want to define storage via API endpoint? https://docs.prefect.io/orchestration/execution/storage_options.html#webhook but if your goal is to mock S3 for unit tests, perhaps you can look at moto: https://github.com/spulec/moto
👍 1
View count: 7