https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    James Phoenix

    08/14/2022, 9:03 PM
  • j

    James Phoenix

    08/14/2022, 9:03 PM
    prefect deployment build base_flow.py:basic_flow --name dev --tag dev --storage-block gcs/dev
  • j

    James Phoenix

    08/14/2022, 9:04 PM
  • j

    James Phoenix

    08/14/2022, 9:04 PM
    Shouldn’t it be
    gs://
    instead of
    gcs://
    ?
    a
    1 reply · 2 participants
  • j

    James Phoenix

    08/14/2022, 9:21 PM
    @property
        def basepath(self) -> str:
            return f"gcs://{self.bucket_path}"
    
        @property
        def filesystem(self) -> RemoteFileSystem:
            settings = {}
            if self.service_account_info:
                try:
                    settings["token"] = json.loads(
                        self.service_account_info.get_secret_value()
                    )
                except json.JSONDecodeError:
                    raise ValueError(
                        "Unable to load provided service_account_info. Please make sure that the provided value is a valid JSON string."
                    )
            remote_file_system = RemoteFileSystem(
                basepath=f"gcs://{self.bucket_path}", settings=settings
            )
            return remote_file_system
  • j

    James Phoenix

    08/14/2022, 9:21 PM
    https://github.com/PrefectHQ/prefect/blob/main/src/prefect/filesystems.py
  • l

    Luke Segars

    08/15/2022, 2:45 AM
    Hi all - I'm having trouble passing env variables into my kubernetes jobs in Prefect v1.2 (so that they're available while my code is running). I'm running jobs in multiple Kubernetes clusters and need to provide some cluster-specific variables to each flow run. Based on some other posts above, I've tried starting my agent in three different ways - it seems like passing as
    --env
    at agent startup is a great solution, but the env variable is null for all three of these approaches. 1. passing as arg in
    prefect agent kubernetes start --env SERVICES_ENDPOINT=XYZ
    2. passing directly as an env var to the agent (dont think this is expected to work) 3. passing as
    PREFECT___CLOUD____AGENT__ENV_VARS_
    any thoughts on what i could be doing wrong? i tried each of these individually
    b
    4 replies · 2 participants
  • i

    Iuliia Volkova

    08/15/2022, 3:07 AM
    @Anna Geller Prefect 2.0 hi can you help me with deployments? I'm not sure that I understand the idea. I have a Prefect cluster in K8s. I have some IP address of prefect server. I want to deploy flows from my local machine to Prefect cluster. 1) as in docs I set up PREFECT_URI to my Prefect server -ok 2) I do prefect deployment build - it builds bunch of file for deployment with paths on my local machine 3) I do prefect deployment apply & I got the deployment in Prefect server, but all paths reference to my local env & cluster of course don't have access to it. Question: is there any way how to deploy flows from local env to remote Prefect server? if yes, can you help me that I'm doing wrong? 4) If 'no' so that is expected flow? that command line is running on the remote cluster & I need to write script to copy-paste flows to remote server & after that apply all this 'prefect' commands?
    👋 1
    a
    10 replies · 2 participants
  • b

    Beizhen

    08/15/2022, 7:48 AM
    Hi community, how do I fix this
    ClientError
    message? All upstream tasks succeeded. Setup:
    prefect 1.2.2
    , prefect cloud.
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f0614b56-3c3b-41f6-9481-878c9f4bee78: provided a running state but associated flow run 8647f325-a45d-4124-9995-b68d43faaf15 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 1604, in set_task_run_state
        result = self.graphql(
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 464, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f0614b56-3c3b-41f6-9481-878c9f4bee78: provided a running state but associated flow run 8647f325-a45d-4124-9995-b68d43faaf15 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    ✅ 1
    b
    9 replies · 2 participants
  • s

    Sven Aoki

    08/15/2022, 8:22 AM
    Hello, has anyone run a separate docker image in a flow? My prefect deployment goes as:
    prefect deployment build fx_run.py:fx_rates_flow -n fx_deploy --infra docker-container -sb remote-file-system/minio
    and then I also created a custom docker image with s3fs installed which I pull during deployment. However I keep getting the error:
    No such file or directory: 'docker': 'docker'
    . Can anyone advise?
    a
    7 replies · 2 participants
  • c

    Chris L.

    08/15/2022, 8:32 AM
    Hello there, a question about Prefect 2.0 and flows using Dask runner. I posted it over at data-tricks-and-tips but this places seems to be more active. Hoping somebody can help with this! https://prefect-community.slack.com/archives/C03CY667GGM/p1660137898917299 It relates to getting the Prefect's task's Dask worker client from inside the task. Using
    worker_client
    context manager works as expected in Prefect 1.0 (see https://discourse.prefect.io/t/how-to-use-dask-without-mapping-in-prefect-1-0-using-das[…]-client-to-call-client-submit-inside-a-prefect-task/470/13) but this behavior doesn't seem to migrate over to 2.0
    a
    1 reply · 2 participants
  • h

    Hawkar Mahmod

    08/15/2022, 8:53 AM
    Hey folks, Is anyone else having issues with Prefect Cloud UI. I have black panels on the display that weren’t there before, and contents in certain places not showing up. Tried different browsers and it doesn’t appear to help.
    ✅ 1
    a
    3 replies · 2 participants
  • d

    Dekel R

    08/15/2022, 1:51 PM
    Hey everyone, I have a Prefect (1) flow that uses a GPU (Nvidia T4). I get this really odd behavior - I register the flow using python 3.9, and I run it on a python 3.8 container (tensorflow/tensorflow:latest-gpu) When running on a machine without GPU - everything works. When running on a machine with GPU - I get this error -
    Task 'upload_data_to_bq_task': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/dekelr/PycharmProjects/similarity-filter-layer/prefect_tasks/upload_data_to_bq.py", line 24, in upload_data_to_bq_task
    SystemError: unknown opcode
    The code fails when running this specific row - (batch size is either an int or None)
    if batch_size is None:
    Everything works fine when I change this row to “if not batch_size:” After some troubleshooting - I found this thread - https://github.com/PrefectHQ/prefect/issues/3635 Running the same flow with the original row of -
    if batch_size is None:
    Still doesn’t work when registering with python 3.8 (same python version as in the container) Can you please explain this really odd behavior? Thanks
    j
    6 replies · 2 participants
  • d

    dammy arinde

    08/15/2022, 2:05 PM
    Hello, we tried to install redshift connector into docker and even though it was successful, every time we run the script using the redshift connection, prefect dev returns module not found error.
    j
    e
    2 replies · 3 participants
  • o

    Oscar Björhn

    08/15/2022, 2:22 PM
    I'm having some trouble running deployments that override a flow's default parameters, has anyone bumped into the same issue? • I have a flow that takes a parameter, the parameter has a default value (let's call it False). • Before running deployment apply, I modify the deployment.yaml file so that the parameter has the value True instead. • When viewing the deployment in the GUI, it correctly states that the parameters default value is True (based on what was in deployment.yaml) • However, when I run the deployment and log the parameter, the flow run reports the parameter value as being False. I have tried a few different data types in order to see if the behavior changes, such as bool and str, but it makes no difference. I'm using Prefect cloud and I'm on 2.0.4 with a fresh venv.
    ✅ 1
    k
    14 replies · 2 participants
  • s

    Sam Garvis

    08/15/2022, 3:15 PM
    I am getting the above error when trying to run a flow in Prefect 2.0. I'm in namespace dev-prefect in k8s. The flow logs in the Prefect UI are completely blank. Do I need to make a custom service account in k8s?
    j
    b
    +2
    9 replies · 5 participants
  • h

    Hamza Naanani

    08/15/2022, 3:24 PM
    Hello, Can we configure the Orion Database for Prefect Cloud ? Or is it already managed by Prefect cloud ? I also want to ask about the sensitivity of the data stored there, as there will be task results stored in the database. So how does it work ?
    j
    1 reply · 2 participants
  • j

    James Brady

    08/15/2022, 4:58 PM
    I'm using Prefect 2 to build a data pipeline and have a couple of general design questions: • I assume we should be thinking about holding data in S3 and passing it between flows by reference – i.e. rather than as parameters • If so, should I use S3 blocks to store intermediate results, or interact with S3 directly? What are the trade-offs? • I'm planning on using Task.map to parallelise work (using dask); there's no equivalent for Flows, so I guess we should be thinking of parallelisation only happening within flows, is that right?
    a
    e
    7 replies · 3 participants
  • l

    Lana Dann

    08/15/2022, 5:57 PM
    hello! does anyone have a graphql query to cancel all running instances of a specific flow? i’m on prefect 1.0 if that makes a difference
    ✅ 1
    b
    1 reply · 2 participants
  • p

    Pedro Machado

    08/15/2022, 8:04 PM
    Hi. Any plans to update this terraform module for 2.0? https://github.com/aws-ia/terraform-prefect-agent-ec2
    g
    4 replies · 2 participants
  • j

    Jai P

    08/15/2022, 8:55 PM
    👋 are there any plans for prefect 2.0 to hook into common apm tooling (e.g. datadog, open telemetry)? It would be great if i could get all my tasks instrumented and reporting into datadog apm so i could, for example, see a distributed trace between a prefect flow/task and the various services it interacts with. I know datadog has a ddtrace library that is maybe a starting point?
    a
    2 replies · 2 participants
  • d

    datamongus

    08/15/2022, 11:25 PM
    Greetings, I am trying to install the old prefect-gcp for prefect 1.0 to execute BigQueryTasks
    prefect-gcp
    seems to only refer to Prefect 2.0
    a
    m
    +1
    12 replies · 4 participants
  • j

    James Brady

    08/16/2022, 5:24 AM
    Do tasks need to be in the same file as the flow which uses them 🤔 ? When I had a larger task in a separate module, I got PicklingErrors (stack trace in thread) – if I move the task definition into the my main module it works as expected.
    o
    a
    5 replies · 3 participants
  • e

    emo loic

    08/16/2022, 9:20 AM
    Hello. If I want to run flows in a local subprocess. Do I have to specify the environment in which prefect agents will execute the tasks? Let's supposed my flow requires some dependencies that I have already installed in a python environment. When the agents will pick up the flow, where do I specify the environment they need to in other to run the flow?
    o
    a
    10 replies · 3 participants
  • a

    Andreas Nigg

    08/16/2022, 10:18 AM
    Hey, I just upgraded from prefect 2.0.0 to 2.0.4 and re-created my deployments. For one of my old deployments, I manually added a schedule-Definition and renamed the file to mydeployment_old.yaml. Then I ran prefect deployment build myflow.py:myflow --name mydeployment -t mytag -sb gcs/gcs-prefect-stprage -o mydeployment.yaml And like magic, the schedule definition block was automatically added to mydeployment.yaml. This is by the way awesome - but ... I don't know why this happened 😅 So, my question is I guess: How does prefect build know, which deployment-yaml settings to apply? Did it simply look in the mydeployment_old.yaml file which was in the same folder and figure "oh well, there is a schedule block in there, so I guess he'll need it in the new deployment as well"? (To get my intention right: This feature is awesome, I'd just like to understand how exactly it works, so that I can use it as intended)
    a
    o
    8 replies · 3 participants
  • o

    Oscar Björhn

    08/16/2022, 10:38 AM
    We actually got our Prefect 2 flows working in our production environment yesterday! 🎉 However, today's big daily scheduled orchestrator job has a problem I haven't seen during my 2.5 weeks of development and testing: Two of the created flows do not get picked up by the agent, they're stuck in the work queue. I just tried to start another deployment and now there's 3 in the queue. All the previous flows were picked up correctly and they all have the same tags. Has anyone run into a similar problem? I'm not sure what other information should include.. The agent is still running. There are no concurrency limits set for this work queue.
    👏 1
    🎉 1
    🙌 1
    a
    31 replies · 2 participants
  • m

    Marcin Grzybowski

    08/16/2022, 11:26 AM
    Hi all, I have a problem with prefect-snowflake and/or concurrentTaskRunner and/or asyncio: I'm trying to run ~10 snowflake queries simultaneously (example code in thread) And only 4 or 5 or 6 are picked, the rest queries run when some previous query finishes. It looks that the threads do not "go to sleep" as intended when they wait for snowflake results and because of that other tasks are not picked.
    b
    7 replies · 2 participants
  • p

    Parwez Noori

    08/16/2022, 1:45 PM
    Hi everyone! I have the following issue, that I cannot seem to set the secret api key correctly inside the agent: I have verified with this: kubectl get secret api-key --namespace prefect2 -o jsonpath='{.data.prefect_api_key}' | base64 --decode This is done in Azure following Cristopher Boyd:

    https://www.youtube.com/watch?v=m9FYoOIfWMc&ab_channel=Prefect▾

    ✅ 1
    c
    b
    20 replies · 3 participants
  • a

    Andreas Nigg

    08/16/2022, 1:50 PM
    Hey. I've an issue with using the prefect-airbyte collection (v. 0.1.0) with prefect 2.0.4 (not related to version 0.4, it also occurs with any prefect 2 version). If this is the wrong channel to ask about collections, let me know. I've a very simple flow, which triggers one or many airbyte connections. The flow is almost 1 to 1 from the doc-samples. If I run the flow with triggering one airbyte connection id and wait, until the flow is done - and then wait for 10 (something) more seconds - before I start another flow run, everything is fine. However, if I run the flow twice in parallel (no matter whether it's with the same airbyte connection - id or a different one), I get a TaskCanceled Exception (see thread) - but the airbyte connection still gets triggered and finishes successfully. So the flow triggering airbyte works, but the flow gets canceled seconds after the airbyte connection is triggered. As the airbyte connections get both triggered (and both finish successfully), I'm starting to ask the question here - but I can imagine, that this might also be a problem on the airbyte side... Any hints, how to circumvent this? Edit: The same result whether I run the flow locally or use a dedicated agent to run it.
    a
    a
    +2
    11 replies · 5 participants
  • t

    Tim Enders

    08/16/2022, 1:59 PM
    Is it recommended to use a Postgres backend when using Dask? I am getting the following errors with the default SQLite implementation.
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
    ✅ 1
    m
    a
    4 replies · 3 participants
Powered by Linen
Title
t

Tim Enders

08/16/2022, 1:59 PM
Is it recommended to use a Postgres backend when using Dask? I am getting the following errors with the default SQLite implementation.
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
✅ 1
m

Marcin Grzybowski

08/16/2022, 2:19 PM
i could run tasks with DaskTaskRunner with sqlite (prefect2)
a

Anna Geller

08/16/2022, 2:28 PM
Is it recommended to use a Postgres backend when using Dask
IMO yes since you will most likely come across issues with concurrent writes which won't work with SQLite Postgres or just using Cloud - both works: https://app.prefect.cloud/
👍 1
t

Tim Enders

08/16/2022, 2:37 PM
I need to setup a local testing environment, otherwise we will be using cloud for production and staging areas. Thank you
The DB is happy since I switched to Posgres. Thank you!
🙌 1
View count: 5