https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kelvin DeCosta

    10/07/2022, 12:11 PM
    I've gone through the
    dataflow-ops
    repo and I am planning to create an ECS service for a Prefect Agent that submits ECS Tasks that run the Prefect Deployments. In the repo, as per my understanding, the agent runs as an ECS based on an ECS Task Definition which requires a Docker Image. Does this Docker image need the different Python packages required by the
    flows
    ? Or can it be lightweight and only include packages necessary for
    prefect
    ?
    ✅ 1
    💯 2
    a
    • 2
    • 9
  • m

    Maren Elise Rønneseth

    10/07/2022, 1:25 PM
    Hi! I have just started testing Prefect and are looking into using Prefect Cloud. We want to configure the Prefect database (which can contain sensitive logs) to be hosted in our own cloud and not in Prefect Cloud. Is this possible, and if so how can this be done?
    r
    a
    • 3
    • 3
  • s

    Seth Coussens

    10/07/2022, 2:38 PM
    Anyway to create a Microsoft Teams Webhook block from the UI? I see slack, but no teams. It's in the Docs, but not in the UI.
    k
    • 2
    • 5
  • a

    Anthony Desmier

    10/07/2022, 3:40 PM
    Hi everyone! We're in the process of migrating to 2.0 and experimenting with the ECSTask block. We define our task definitions within terraform and deploying our agent ECS task works fine and it can successfully retrieve a task definition for a flow - the type, cluster and task_definition_arn are defined on the ECSTask block for the flow we're testing. However, when the agent tries to register the task definition we receive AWS errors of the form:
    Unknown parameter in input: "registeredBy", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform
    This appears to be an api incompatibility on the aws-sdk side where the json returned from the describe call contains fields that are incompatible with the register command. There is a workaround described in the issue, but I'm wondering if anyone has come across this before when using prefect-aws or whether we need to do some additional configuration on our end to account for this? Thanks in advance!
    ✅ 1
    a
    • 2
    • 5
  • s

    Sam Thomas

    10/07/2022, 3:45 PM
    I've been looking at trying to integrate Prefect (2.x) with Object Oriented Programming and design patterns. As discussed in an earlier thread https://app.slack.com/client/TL09B008Y/CL09KU1K7/thread/CL09KU1K7-1658744762.332369 it's not straight forward to integrate Tasks and Flows with classes. I've got something that works but I was wondering if there's a more elegant way In this example I'm making a process that gets a number and adds another number to it for simplicity
    class Maker():
        def __init__(self, val=5):
            self._val=val
            
        def make_number(self):
            return self._val
        
        @property
        def make_number_task(self):
            return prefect.Task(self.make_number)
        
    class Adder():
        def __init__(self, val=3):
            self._val=val
    
        def add_number(self, x):
            return x+self._val
        
        @property
        def add_number_task(self):
            return prefect.Task(self.add_number)
        
    class Calculator():
        def __init__(self):
            self.maker = Maker()
            self.adder = Adder()
    
        def _flow(self):
            a = self.maker.make_number_task()
            b = self.adder.add_number_task(a)
            return b
        
        @property
        def flow(self):
            return prefect.Flow(self._flow)
    This seems to work.
    c = Calculator()
    c.flow()
    <Prefect messages>
    8
    c.maker._val=10
    c.flow()
    <Prefect messages>
    13
    I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.
    p
    • 2
    • 3
  • s

    Sean Turner

    10/07/2022, 5:35 PM
    What happens when a
    flow
    (
    foo
    ) is running and a new deployment is created that changes the flow code that
    foo
    was executing. QUESTION 1: Would
    foo
    continue to execute and ignore the new deployment? Also, what happens if
    foo
    intends to trigger a
    sub flow
    (
    bar
    ), but a new deployment is created that changes the flow code that
    bar
    would execute? So the timeline is a.
    flow
    foo
    is triggered b.
    sub flow
    bar
    is updated (sub flow id changes from
    123
    to
    456
    ) c.
    foo
    gets to the point in execution where
    bar
    is called. QUESTION 2: Would
    foo
    trigger the newly updated
    bar
    code
    456
    ? Or would
    foo
    ignore the change to
    sub flow
    bar
    and trigger
    123
    as
    foo
    was triggered before
    bar
    was updated?
    ✅ 1
    n
    • 2
    • 3
  • k

    Karan

    10/07/2022, 6:18 PM
    Scenario: I have a Docker Image built in GHCR, next I want to schedule this docker image to run every 2 hours. Is Prefect correct option for doing the schedule. If yes how. Unable to find documentation on same.
    ✅ 1
    c
    m
    • 3
    • 3
  • j

    Jarvis Stubblefield

    10/07/2022, 7:10 PM
    I would like some advice about production deployment of agents. I’m using Prefect 2 with Prefect Cloud 2… So I’m working to get my agent(s) to work within my Django environment…once that is finished, I am planning on deploying a new production machine behind a Load Balancer with a minimal percentage of production load as it’s primary job is to run the agent(s) and their tasks. Is there anything wrong technically or theoretically with that thought? I have to have my full production code available to Prefect as the tasks are going to be triggered by actions people can take within our system. (We are using this as a replacement for Celery, hopefully).
    ✅ 1
    n
    a
    +2
    • 5
    • 35
  • l

    Luca Schneider

    10/07/2022, 7:22 PM
    Hi, I have a prefect-orion 2.50. prefect-agent 2.5.0 and MinIO running on openshift. I have written a simple .py to create a deployment. The files are uploaded to minio and the script returns me a deployment ID but it doesnt seem to be registered on the server: the UI deployment panel is still blank. Any idea how to debug ? I’ve been trying for hours but couldnt solve it. thanks
    ✅ 2
    m
    p
    • 3
    • 9
  • p

    Paco Ibañez

    10/07/2022, 9:16 PM
    Hello! Can tags be added to flows? I see a 'tags' column in the database, but i don't see any way to set it in the
    flow
    decorator. thanks!
    ✅ 1
    m
    • 2
    • 7
  • k

    Kyle McChesney

    10/07/2022, 10:34 PM
    I’ve asked a similar question before, but I have a bit better summary of what I am trying to accomplish. Original thread is here: https://prefect-community.slack.com/archives/CL09KU1K7/p1659389359820829 New “angle” to the question is basically: “I have a flow state handler function and I need it to behave differently based on something that happens during the flow run” The specific example is basically • I have a flow state handler that updates an external api with the status/running of a flow. This functionality is optional and controlled via an
    external_job_id
    parameter. So if a flow is running and it is to be associated with some external job record, the id is passed. If the id is passed, updates are made as part of the flow state handler (marking the job done when the flow completes, recording error messages, etc) The above is working great. The flow transition handler is generic, and simply checks for the existence of the parameter in the context. The “new” use case is basically a flow that is triggered by some outside automation. Part of the flows responsibility is to create the external job record, get its ID, and update it when the flow completes. Meaning I dont have the parameter at the start. Is there anyway to save some kind of information onto the flow, within a task, so that I can check for its existence within the flow state transition function? I’ve tried doing
    flow.add_task(Parameter('external_job_id', default=$res)
    where
    $res
    is the result of a task that creates the job and returns the id (half way though the flow). I also tried setting it in the context directly within the task. No luck
    ✅ 1
    a
    • 2
    • 4
  • h

    Hedgar

    10/08/2022, 2:01 PM
    @Anna Geller I have a flow with three tasks the first two ran successfully-looking at it from the UI, however the third and last task which essentially writes all the data to a parquet file in s3 is still in running state. However 23 out of 104 delivered to s3. what could be wrong expecially as cloudwatch log “delivering data to s3”
    ✅ 1
    a
    • 2
    • 2
  • a

    Aaron Goebel

    10/09/2022, 3:34 AM
    If i have utility functions that emit logs , how do I get them mirrored in the orion cloud UI? These are functions used inside of tasks, not tasks themselves--so no access to
    get_run_logger
    . I've seen some references to
    PREFECT_LOGGING_EXTRA_LOGGERS
    , but no legit examples for code that isn't library code. I have code like this at the top of all my utility function modules
    logging.getLogger("utils")
    and have set
    PREFECT_LOGGING_EXTRA_LOGGERS='utils'
    to no avail. Logs don't get streamed up to prefect UI
    ✅ 1
    a
    • 2
    • 1
  • b

    Birkir Björnsson

    10/09/2022, 3:28 PM
    Hi all, I am trying to use string block and fetch the string value string_block_from = String.load("from-string") It returns this String(value='2022-07-01T00:00:00'), how can I only get the value ie '2022-07-01T00:00:00'?
    ✅ 1
    a
    • 2
    • 2
  • s

    sulfredlee

    10/10/2022, 4:52 AM
    Hi all! I am using Prefect 1.0 and looking for a way to update the flow dynamically. From this doc: https://docs-v1.prefect.io/api/latest/core/flow.html#flow-2, I see the function
    set_dependencies
    I am trying to have this example:
    @task(name="task_A")
    def task_A() -(list, int):
    test_output_list = list()
    test_output_int = 10
    return test_output_list, test_output_int
    @task(name="task_B")
    def task_B(input_a: list, input_b: int, input_c: float):
    pirnt(f"{input_a}, {input_b}, {input_c},"
    with Flow(name = "test_flow") as flow:
    ret_a_list, ret_a_int = task_A()
    task_B(ret_a_list, ret_a_int, 10.05)
    I would like to change the implementation for the
    with Flow() as flow
    to
    test_flow = Flow(name="test_flow")
    test_flow.set_dependencies([task_A, task_B], upstream_task=[task_A], downstream_tasks=[task_B])
    I have 2 questions: - is the
    set_dependencies
    the correct function I should use? - how to use
    set_dependencies
    correctly to pass the output from
    task_A
    to
    task_B
    ? Thanks
    ✅ 1
    k
    • 2
    • 3
  • i

    Imre Kerr

    10/10/2022, 8:33 AM
    Hi! We are considering Prefect Cloud for our company's data platform and had a few questions regarding authentication: • The Pricing page says the Enterprise plan supports "SSO". Which mechanism specifically? SAML and/or OIDC? And is it possible to use any provider for these or is there only a defined set of providers available? • Is it possible to provision users by some automated mechanism, e.g. SCIM for users from AD? • What about service accounts? Is there an API for creating these?
    ✅ 1
    a
    • 2
    • 4
  • How to mount a volume to a KubernetesJob infrastructure block on a Prefect Deployment?
    r

    Rajvir Jhawar

    10/10/2022, 1:21 PM
    Is it possible to add a volumeMount via json patches in prefect 2? I keep running into path error, when i look at the kubernetes job source code the shortcut for env looks like:
    "path": "/spec/template/spec/containers/0/env/-"
    Which means the path for the volume mount should be:
    "path": "/spec/template/spec/containers/0/volumeMounts/-"
    When i use that path it doesn't work, any ideas?
    👀 1
    a
    c
    • 3
    • 7
  • m

    Mark

    10/10/2022, 1:28 PM
    I am new to prefect but i was going to try and use it to test run my DBT commands. I have DBT setup in a folder locally with a profiles.yml and a project dir however, I cant seem to get the flow to find it.
    from prefect import flow
    
    from prefect_dbt.cli.credentials import DbtCliProfile
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    @flow
    def trigger_dbt_cli_commands_flow():
        dbt_cli_profile = DbtCliProfile.load("dbtbase")
    
        trigger_kwargs = dict(
            profiles_dir="../dbt/data-dbt",
            project_dir="../dbt/data-dbt/dbt-project/",
            overwrite_profiles=False,
            dbt_cli_profile=dbt_cli_profile,
        )
    
        trigger_dbt_cli_command(
            "dbt deps",
            **trigger_kwargs
        )
        return result
    
    trigger_dbt_cli_commands_flow()
    ✅ 1
    j
    j
    • 3
    • 11
  • j

    Jai P

    10/10/2022, 2:34 PM
    👋 hullo! Are there plans to add additional functionality in the
    prefect-snowflake
    library to wrap snowpark APIs?
    ✅ 1
    j
    • 2
    • 2
  • k

    Kun Situ

    10/10/2022, 2:41 PM
    I have been running agent with prefect 2.0 cloud, but my agent always raise exception after roughly 2 days. Here is the error message:
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url
    Does anyone know how I can fix it, any help would be much appreciated!
    ✅ 1
    a
    • 2
    • 2
  • j

    Jon Young

    10/10/2022, 2:51 PM
    where can I find some examples of how to pass required flow parameters for prefect v1? when i go to register a flow with required parameters, I get
    AttributeError: 'Parameter' object ...
    . Why is the flow trying to run when I register it? the more unclear thing is that when i declare a parameter, it returns a parameter object when i really just want the value that's passed to the flow
    ✅ 1
    k
    b
    • 3
    • 11
  • d

    David Cupp

    10/10/2022, 4:20 PM
    So in Prefect 2, task objects have a handle
    submit()
    method that lets you create task objects and them submit them directly, using
    wait_for
    to specify dependencies. Is there any similar mechanism for sub flows? I am interested in launching a "subflow" but without directly calling the annotated flow method, because I don't know which method I'm calling until runtime. For example, I would like to do this:
    result = make_rpc_call()
    newflow = Flow(
       name=result.name,
       parameters=result.parameters,
    ).submit(wait_for=[...])
    is this possible?
    ✅ 1
    k
    • 2
    • 2
  • d

    David Elliott

    10/10/2022, 4:36 PM
    Hey all, wondering if anyone has any idea on how to generate/write a flow dynamically in 2.0? e.g add tasks + dependencies from another graph object… In 1.0 it used to be possible to just instantiate a flow object and then add tasks / dependencies to the flow object (e.g whilst iterating over a dict or a networkX graph), however now flows are decorators on functions I’m struggling to see how I’d do that? I’ll attach some example code in the 🧵 that shows what I’m trying to achieve. Thanks for any suggestions in advance!
    ✅ 1
    m
    • 2
    • 22
  • d

    David Cupp

    10/10/2022, 5:48 PM
    Hello, I have another question. It is possible to ensure that no tasks start running unless all tasks have been successfully registered? For example, if I have some code registering tasks:
    1   A = Task(...).submit()
    ...
    5   B = Task(...).submit(wait_for=[A])
    ...
    9   _ = Task(...).submit(wait_for=[B])
    If an except is throw after line 1 and before line 9, is there an easy way to ensure that Task "A" does not start?
    b
    • 2
    • 3
  • a

    Alexandru Anghel

    10/10/2022, 6:06 PM
    Hello, I have installed Prefect 1.4.0 on a Minikube instance using Helm. I am providing my own values.yaml file and setting the prefectConfig section to switch the backend to server. However, the graphql deployment config.toml still shows backend cloud once deployed. What am i missing here? Thanks!
    ✅ 1
    m
    • 2
    • 7
  • g

    Georgiana Ogrean

    10/10/2022, 6:38 PM
    I have a workflow that uses Prefect 2 and is run from inside a Docker container. I am able to run the workflow, bring up the Orion UI, and see the workflow in the UI with the states changing as expected. However, I am stuck at trying to create a deployment, with an error saying (full traceback in 🧵):
    ConnectionRefusedError: [Errno 111] Connect call failed ('0.0.0.0', 4200)
    The Orion UI comes up just fine when I do not try to build a deployment with:
    prefect deployment build /app/my_package/flows/my_python_script.py:my_flow_name --name generate_deployment --tag dev --infra docker-container
    Any ideas what I could be missing? More details in the thread.
    m
    j
    • 3
    • 9
  • k

    Kun Situ

    10/10/2022, 8:04 PM
    Sorry guys, another question: do you have prefect 2 base image based on arm64 architecture. I had a look of the previous discussion in the forum, but no success. Any guidance would be very helpful
    ✅ 1
    m
    m
    • 3
    • 5
  • l

    Luca Schneider

    10/10/2022, 8:16 PM
    Hi all, I’m trying to add headers (basic auth) to the http request to the orion client. I have a python script to deploy flows and blocks. How am I supposed to update the client with httpx settings in my script ? thanks
    k
    • 2
    • 10
  • j

    Jacqueline Garrahan

    10/10/2022, 8:19 PM
    Hi- could anyone point me to docs describing how to configure a docker agent to pull from a private registry with 1.x? I've run into issues pulling from our gitlab registry and using pre-pulled images with the gitlab registry prefix: myregistry/image_name:tag
    ✅ 1
    l
    m
    • 3
    • 9
  • t

    Taylor Babin

    10/10/2022, 9:44 PM
    Hi- I have a simple question... how are you/what do you all recommend for organizing your work in prefect? Like, should you/do you have one workspace with all the environments in their in a naming scheme with their own agents/work queues or should you/do you have multiple workspaces for each environment (dev, stage, prod, etc.)? Thank you all for your input!
    ✅ 1
    m
    m
    • 3
    • 3
Powered by Linen
Title
t

Taylor Babin

10/10/2022, 9:44 PM
Hi- I have a simple question... how are you/what do you all recommend for organizing your work in prefect? Like, should you/do you have one workspace with all the environments in their in a naming scheme with their own agents/work queues or should you/do you have multiple workspaces for each environment (dev, stage, prod, etc.)? Thank you all for your input!
✅ 1
m

Matt Conger

10/10/2022, 10:28 PM
@Taylor Babin There are a ton of ways to organizing workspaces in Prefect. I have very commonly seen many users opt for having dev, staging, and prod workspaces as you mentioned.
👍 1
m

Mason Menges

10/10/2022, 10:36 PM
Just to add to this We've also seen users separate their work using workqueues, i.e. maintaining Separate Workqueue-Agent pairs for dev/staging/prod and then using individual workspaces more as a separation of teams within an organization, i.e. analytics, devops, etc.. Workspaces give you more pronounced separation in terms of environments but it really depends on what'll work best for you and your team. Short version there are no hard and fast rules around this but these are some of the configurations we've seen users adopt I'm sure there are many others that we haven't come across yet as well 😄
🙌 3
t

Taylor Babin

10/11/2022, 2:15 PM
Thank you! I figured as much. Just wanted to get an idea to figure out what's best
View count: 1