https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Roderick

    03/30/2022, 4:38 PM
    Hello, I am running into what seems like a simple problem, but not seeing how to do it. I have an implicit dependency from taskC to taskD, but can't get prefect to understand it
    k
    • 2
    • 7
  • f

    FuETL

    03/30/2022, 5:01 PM
    Hey guys is possible to hide the "RUN CONFIG" tab from Prefect Server? Because today everyone can see the secrets.
    a
    • 2
    • 8
  • m

    Maria

    03/30/2022, 5:40 PM
    I’m running into an issue trying to use GitLab storage to reference flows stored in my gitlab repo. This is how I have it in my script:
    from prefect.storage import GitLab
    storage = GitLab(repo="org/repo",
                     path="/hello/hello_k8s.py",
                     ref="prefect")
    And when I try to run the flow, I get 404 error file not found but the file is there. Is there a way to print out what path prefect runner is using?
    ✅ 1
    :discourse: 1
    k
    a
    • 3
    • 58
  • s

    Shuchita Tripathi

    03/30/2022, 5:45 PM
    I am trying to run a simple terraform code using prefect. the terraform code is a .tf file which creates a storage account. When I am running it as a normal python code (without including prefect, flows etc), it is working fine. But with prefect I am getting error. I am not sure if this is related to prefect, or something else.
    k
    c
    a
    • 4
    • 9
  • c

    Chris Reuter

    03/30/2022, 6:45 PM
    👋 hey all! You probably saw the module that was published to the Terraform registry recently, that builds the Prefect Docker agent and related infrastructure. Come join @Jamie Zieziula, @George Coyne and myself on PrefectLive in 15 minutes as we walk through that module, how it was built and how to use it! twitch.tv/prefectlive
    😍 1
  • f

    Fina Silva-Santisteban

    03/30/2022, 6:52 PM
    Hi everyone! Does someone have examples of how they’ve used the SnowflakeQuery task?? Specially interested in examples using the imperative api! I have the following so far:
    flow.set_dependencies(
                task=SnowflakeQuery,
                keyword_tasks=dict(query='''SELECT * FROM dummy_table;'''
                )
            )
    flow.set_dependencies(
                task=save_query_result_as_df,
                keyword_tasks=dict(result_set=SnowflakeQuery)
            )
    The task
    save_query_result_as_df
    currently only does a
    print()
    of the
    result_set
    . I’m confused about a few things: • I didn’t provide the
    SnowflakeQuery
    with any authentication. Why doesn’t it throw an error about that? • The print statement prints out
    <class 'prefect.tasks.snowflake.snowflake.SnowflakeQuery'>
    , which makes me think the snowflakequery task wasn’t run? If it wasn’t run it would at least explain why it didn’t throw an error 😅 but how can I make it run?
    :discourse: 1
    a
    k
    • 3
    • 7
  • s

    Scott Aefsky

    03/30/2022, 6:54 PM
    I'm running into some memory limits with a flow using an ECSRun config on a LocalCluster DaskExecutor. The first task in the flow is to generate a list of file keys, and then I map over that list to process the files. I would have expected that when an individual Mapped Child task completes, any memory used in that task would have been released (aside from the return value). But it seems like that is not the case, as the process eventually runs out of memory after several Mapped Children finish. Is there something I can do in the configuration of the flow or task to mitigate this? As an aside, when I run this same flow using a FargateCluster, it will complete without issue. I'm not sure if that's because the FargateCluster I'm using just has more memory, or if there's something inherently different between the two different DaskExecutor classes. Thanks as always for any help you can provide.
    a
    k
    a
    • 4
    • 28
  • a

    Anna Geller

    03/30/2022, 7:12 PM
    If you haven't already, join us on PrefectLive right now! 🔥 covering infrastructure topics such as deploying Prefect agents with Terraform with our super-knowledgeable Senior Infrastructure engineers! The demo starts now! 🙂
    • 1
    • 1
  • i

    Ievgenii Martynenko

    03/30/2022, 9:19 PM
    Hi, I'm extending MySQL Execute and Fetch tasks to override init method, while 'run' remains the same. I'm initializing tasks outsize of flow() and then just call them expecting that second task is dependent on first one. Definitely doing something wrong. get_time (line 5) is just initializing a task, not getting actual output from select query.
    fetch_task = TestMySQLFetchOneValue(connection_name='...', query="select now(6) as time_column", name="Fetch Time")
    execute_task = TestMySQLExecute(connection_name='...', name="Write Time")
    
    with Flow("Test Flow") as flow:
        get_time = fetch_task()  #fetch_task().run() works
        execute = execute_task(query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
    
    flow.run()
    k
    • 2
    • 6
  • g

    Gaurav

    03/30/2022, 10:48 PM
    Hello, I have successfully deployed a Kubernetes Prefect Agent on Azure Kubernetes Cluster. I am trying to run a simple flow that utilizes a LocalDaskExecutor on the AKS Virtual Nodes. For this, I am using a custom job template for the pod, because it needs some customized node selectors and tolerations that Azure publishes. the following is snippet of my job_template:
    job_template={
                "apiVersion": "batch/v1",   
                "kind": "Job",
                "spec": {
                    "template": {
                        "metadata": {
                            "labels": {
                                "execution-model": "serverless"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "flow"
                                }
                            ],
                            "nodeSelector": {
                                "execution-model": "serverless"
                            },
                            "tolerations": [
                                {
                                    "key": "<http://virtual-kubelet.io/provider|virtual-kubelet.io/provider>",
                                    "operator": "Exists"
                                }
                            ]
                        }
                    }
                }
    However the flow fails. When i ran kubectl get events. I notice the following output:
    Warning   ProviderCreateFailed   pod/prefect-job-XXXXX-XXXXX   ACI does not support providing args without specifying the command. Please supply both command and args to the pod spec.
    Just some more information - I also ran the same flow successfully on a alternate deployment on AWS EKS Fargate, using an AWS Kubernetes Agent. Any guidance is really appreciated :)
    :discourse: 1
    k
    k
    +2
    • 5
    • 30
  • c

    Caleb Ejakait

    03/31/2022, 7:47 AM
    Hey guys! Quick question about the DockerRun config. I am running a docker agent on an EC2 instance and have specified a custom image that I have added my flow dependencies to. But keep getting an 404 image not found error(no such image: repo/image_name:tag). This is the same even from prefect core images for a simple test flow. Is there something I could have missed in the agent config?
    :discourse: 1
    m
    a
    • 3
    • 5
  • m

    Michael Smith

    03/31/2022, 9:52 AM
    Morning, I am testing a crash recovery scenario with prefect 2. My workflow has a few steps, all of which do lengthy sleeps. I have an agent running on a compute engine instance (wont be our final deployment architecture but is convenient for testing). I suspended the compute engine instance mid flow run. The log in the prefect UI indicates "Crash detected!" however the TaskRun still shows as Running. After restarting the agent it looks like there is no automatic crash recovery, so in this scenario we would need to setup a flow timeout? Is there any way to resubmit a TaskRun, and do all the agents operate in this way?
    a
    k
    • 3
    • 4
  • h

    Henning Holgersen

    03/31/2022, 11:25 AM
    I'm looking at a scenario where a flow could be triggered a number of times simultaneously (via the api) - I think the extreme case is around 300x within a few seconds. I don't mind the tasks queuing up, but is there a point at which something will stop working from the Prefect side of things? I need to make sure the flows will indeed run - sooner or later.
    a
    • 2
    • 3
  • t

    Thomas Mignon

    03/31/2022, 11:38 AM
    Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter ?
    a
    k
    • 3
    • 75
  • n

    Noam Gal

    03/31/2022, 12:44 PM
    Hey, Congrats for releasing prefect 2.0 (beta). We currently were in the middle of using prefect 1.0 and none of our pipelines were production ready so we decided to move on and adopt Prefect 2.0 orion at first. At our pipeline we were using <task>.run to call a task from other task (not sure if that was kind of a workaround), but now this feature seems to be lost. Is there any workaround for doing that on prefect 2.0 ? (calling a task from other task) Are there any guidelines for moving from prefect 1.0 to prefect 2.0? Thanks!
    🥳 1
    k
    a
    • 3
    • 4
  • s

    Shuchita Tripathi

    03/31/2022, 1:58 PM
    Hi. My scenario is to create a prefect flow and then run it using an API call. I am able to run any already created flow using POST calls (screenshot1 attached). When the POST call is invoked, the function prefect_flow is called. But I am not getting idea on how to create flows by POST calls. (screenshot2 attached for create flow code without any reference to API). I tried to encapsulate the whole task and flow inside one function (temp_prefect_run, line#9), and then calling that function for POST call, but I am getting internal server error. When checking the detailed logs: if I am doing f.register(project_name), the error says ->
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002A9EA91E100>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
    My prefect server is not at localhost. if I am doing client.register(flow, project_name), the error says ->
    raise ValueError("This flow has no storage to build")
    ValueError: This flow has no storage to build
    What is the best way to run POST calls which can create a new flow?
    k
    a
    • 3
    • 73
  • p

    Prasanth Kothuri

    03/31/2022, 2:19 PM
    Hello, is there an example of how I add s3 file as an attachment to prefect email task , I looked at the code and the current attachments param only support local uri , TIA
    k
    a
    • 3
    • 6
  • a

    Atul Anand

    03/31/2022, 2:48 PM
    I have implemented distributed dask with the prefect. The task is working perfectly fine with LocalDaskExecutor but when I tried to run it in a Distributed Dask it has given an Error. I have a docker-compose of dask with 1 schdular and multiple workers. Every thing is up and works perfectly in case of local.
    k
    • 2
    • 11
  • a

    Atul Anand

    03/31/2022, 2:48 PM
    Any One Have idea then kindly help!
  • a

    Atul Anand

    03/31/2022, 2:50 PM
    Error which i am getting : | Unexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'prefect'")
  • j

    Joshua Weber

    03/31/2022, 4:39 PM
    Hi everyone!
    👋 4
    a
    j
    • 3
    • 2
  • j

    Joshua Weber

    03/31/2022, 4:40 PM
    I’m using the prefect master branch docker image and when prefect spins up, it’s throwing a json decode error. I know that’s general, but has anyone encountered this?
    ✅ 1
    k
    • 2
    • 11
  • j

    Joshua Weber

    03/31/2022, 4:40 PM
    Tracing it through graphql to hasura currently
  • b

    Bruno Nunes

    03/31/2022, 4:48 PM
    Hi, when trying to install prefect orion in kubernetes can I specify which namespace I want the service to be applied? Also, when I run the
    prefect orion kubernetes-manifest | kubectl apply -f -
    I got an error pulling the container because it didn't find docker.io/prefecthq/prefect:dev-python3.8. I've edited the deployment manifest to use docker.io/prefecthq/prefect:latest-python3.8 instead but I'm guessing that this is not the correct container either since I'm now getting the following error in the pod logs:
    Usage: prefect [OPTIONS] COMMAND [ARGS]...
    Try 'prefect -h' for help.
    Error: No such command 'orion'.
    a
    • 2
    • 12
  • j

    Jose Daniel Posada Montoya

    03/31/2022, 5:43 PM
    Hi! has anyone faced this error when starting Orion? the .db file does not exist, it is new.
    👋 2
    a
    m
    b
    • 4
    • 14
  • p

    Patrick Tan

    03/31/2022, 7:02 PM
    Hi, I am not able to locate config.toml in my macbook, I only see auth.toml and backend.toml. please advise
    k
    • 2
    • 1
  • m

    Milton

    03/31/2022, 7:08 PM
    Hi is there a way to roll back a flow to a previous version?
    k
    • 2
    • 2
  • h

    Horatiu Bota

    03/31/2022, 7:20 PM
    hi prefect-community! is there a good way to use flow Parameters when configuring tasks before runtime -- for example, i'd like to pass company_id/company_name as parameters to my flow, then use those values to configure where to cache task outputs:
    k
    • 2
    • 4
  • j

    Joshua Weber

    03/31/2022, 7:25 PM
    Co-worker put this issue request in: https://github.com/PrefectHQ/ui/issues/1261 Is this a current feature we’re missing?
    k
    • 2
    • 6
  • i

    Ievgenii Martynenko

    03/31/2022, 7:30 PM
    Hi, I'm experimenting with best practices according to https://docs.prefect.io/core/concepts/best-practices.html#writing-prefect-code, and don't get why the following code doesn't build proper execution flow. It should be single top to bottom DAG, but instead I got some mess. Task are initialized outside on Flow imperatively; inside Flow tasks are executed, most of them have no output and dependency is set via .set_upstream.
    with Flow("Test ETL Flow") as flow:
        start_task_result = start_task()
        truncate_task_result = truncate_task()
        fetch_task_result = fetch_task()
        update_template_result = update_template_task(variables=fetch_task_result)
        merge_task_result = merge_task()
        complete_task_result = complete_task()
    
        truncate_task.set_upstream(start_task_result)
        fetch_task.set_upstream(truncate_task_result)
        merge_task.set_upstream(update_template_result)
        complete_task.set_upstream(merge_task_result)
    
    flow.visualize()
    k
    • 2
    • 4
Powered by Linen
Title
i

Ievgenii Martynenko

03/31/2022, 7:30 PM
Hi, I'm experimenting with best practices according to https://docs.prefect.io/core/concepts/best-practices.html#writing-prefect-code, and don't get why the following code doesn't build proper execution flow. It should be single top to bottom DAG, but instead I got some mess. Task are initialized outside on Flow imperatively; inside Flow tasks are executed, most of them have no output and dependency is set via .set_upstream.
with Flow("Test ETL Flow") as flow:
    start_task_result = start_task()
    truncate_task_result = truncate_task()
    fetch_task_result = fetch_task()
    update_template_result = update_template_task(variables=fetch_task_result)
    merge_task_result = merge_task()
    complete_task_result = complete_task()

    truncate_task.set_upstream(start_task_result)
    fetch_task.set_upstream(truncate_task_result)
    merge_task.set_upstream(update_template_result)
    complete_task.set_upstream(merge_task_result)

flow.visualize()
k

Kevin Kho

03/31/2022, 7:34 PM
I think the mess here is because of this:
truncate_task.set_upstream(start_task_result)
Instead you want:
truncate_task_result.set_upstream(start_task_result)
so that the upstream is set on the instance.
truncate_task.set_upstream(start_task_result)
refers to the task while
truncate_task_result
is the instance of the task
i

Ievgenii Martynenko

03/31/2022, 7:37 PM
<here should be "not bad" meme face". Thanks.
k

Kevin Kho

03/31/2022, 7:38 PM
Nice!
View count: 6