https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vladimir Bolshakov

    03/18/2022, 8:23 AM
    Hi! I try to implement custom retrying policy in Prefect Orion (2.0b2). But, as i see at this moment, policies in Orion are hardcoded (CoreFlowPolicy/CoreTaskPolicy, MinimalFlowPolicy/MinimalTaskPolicy) and server API handlers for setting states do not have parameters in requests to receive data about custom policies (task policy and flow policy not a parts of requests, but just a dependency injection parameters initialised with
    provide_task_policy
    /
    provide_flow_policy
    ). So my question is about orchestration settings that will be released in the future. Is custom orchestration policies will be parameters of API requests to set state of task/flow? Or orchestration concepts and APIs will be seriously changed in the near future? How orchestration policies will be serialized/deserialized between server and agent’s engines?
    a
    d
    11 replies · 3 participants
  • c

    Clovis

    03/18/2022, 10:24 AM
    Hi everyone 👋 ! I'm using prefect since a few months now but I keep encountering the same issue with my Airbyte tasks: From some reasons, my Airbyte connection failed at some point and throw a
    Failed
    status but, I don't know why, prefect considers the task as successful (cf. my screenshot in attachment). It's a blocking point from my point of view as it prevents me from relying on prefect and therefore forces me to double-check each time with Airbyte. Maybe this issue comes from my code but I don't see why ?
    sync_airbyte_connection = AirbyteConnectionTask(
        max_retries=3, retry_delay=timedelta(seconds=10), timeout=timedelta(minutes=30),
    )
    
    with Flow("my flow", run_config=UniversalRun()) as flow:
        airbyte_sync = sync_airbyte_connection(
            <connection_infos>,
        )
        [...]
    
    flow.set_reference_tasks([
       airbyte_sync
    ])
    a
    10 replies · 2 participants
  • m

    Malthe Karbo

    03/18/2022, 1:28 PM
    Hi, if anyone is experiencing issues with EKS and/or Kubernetes on dask using prefect, I found that the latest release (14h ago) of kubernetes-asyncio (22.6) has been breaking all my flows that worked yesterday (it fails to pickup serviceaccount for rbac, sends all requests as 'system/anonymous'). Pinning version 'kubernetes-asyncio<22.6' in my requirements fixed it for me. Issue: https://github.com/PrefectHQ/prefect/issues/5573
    👍 1
    k
    m
    3 replies · 3 participants
  • v

    Vadym Dytyniak

    03/18/2022, 2:46 PM
    Hey. In test I would like to assert that create_flow_run was called with expected parameters. Is it possible somehow to get task result?
    a
    2 replies · 2 participants
  • a

    Adam Roderick

    03/18/2022, 3:41 PM
    We've started seeing an error when accessing secrets from Prefect Cloud. Any ideas how I can troubleshoot this?
    prefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    a
    10 replies · 3 participants
  • c

    Chris Reuter

    03/18/2022, 4:32 PM
    We've laughed, we've cried, but most of all we've 🚀 launched! Come join @Jeremiah, @Chris White and myself as we recap Launch Week 2022 at 3p Eastern on

    Youtube▾

    . You might find yourself hearing a few more exciting announcements 😏 https://prefect-community.slack.com/archives/C036FRC4KMW/p1647621014192929
    j
    3 replies · 2 participants
  • a

    Aram Panasenco

    03/18/2022, 5:16 PM
    I'd like to set up a testing framework for testing Prefect flows. The idea is that when someone modifies a Prefect flow in a pull request, an automated DevOps process can launch that flow with certain parameters and somehow test that the flow did what it was supposed to do. I couldn't find anything in the documentation or in Slack. Is there an official testing framework for Prefect flows? If not, what do you all use?
    k
    k
    4 replies · 3 participants
  • w

    Wei Mei

    03/18/2022, 6:05 PM
    Hi, I am making my first pass at using Github actions to CI. I have reached something I don’t fully understand. On pull_request of a feature/** branch, I run actions that: 1. checkout the branch 2. create a project in prefect cloud 3. register the flow.py with a --label dev when I run the flow manually, i get an error:
    Unexpected error while running flow: KeyError('Task slug connect_source-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')
    k
    m
    26 replies · 3 participants
  • c

    Chris Reuter

    03/18/2022, 6:55 PM
    See you all in 5 mins on a Fireside Chat:

    https://www.youtube.com/watch?v=uIv1m3-2tjA▾

    🔥 3
  • j

    Jean-Michel Provencher

    03/18/2022, 7:05 PM
    Hi, do you guys know if it’s possible to pass
    upstream_tasks
    with methods that actually requires parameter? The documentation is not really clear regarding how to chain multiple upstream_tasks to create dependencies between them and I was wondering if some of you had some more complex examples. For example, I don’t think I can do this
    with Flow(f"{environment_prefix}-test", storage=S3(bucket=storage_location_bucket_name)) as flow:
    
    
        dbt_run(organization_id_param, data_processing_start_date_param, data_processing_end_date_param, should_process_last_period, period, period_value,
                upstream_tasks=[pull_snowflake_secret(a,b), pull_repo(b,c)])
    e
    k
    2 replies · 3 participants
  • t

    Tony Liberato

    03/18/2022, 7:44 PM
    Does anyone have a complete working example on how to use PrefectResult? Its simple: Scrape a website for a link, if the link is different than the prefect result, download it and save the new link as the result. Otherwise exit with "success: no work to be done". I don't want this to be a local result because we plan to move these flows off of a windows agent and into docker agent. Thanks in advance!
    k
    a
    +1
    23 replies · 4 participants
  • m

    Michał Augoff

    03/18/2022, 8:12 PM
    is there an equivalent of
    IMAGE_PULL_SECRETS
    for the Orion k8s agent/flow runner? I couldn’t find anything in the docs and the code
    k
    5 replies · 2 participants
  • d

    Darshan

    03/18/2022, 9:06 PM
    Hello, for prefect 2.0 - if I want to add flow run id in all the log statements, do I have to override format of all the loggers defined in default logging config or is there a way to apply this change by overriding only one parent logger?
    k
    5 replies · 2 participants
  • t

    Tabari Brannon

    03/18/2022, 9:35 PM
    Hi all! I am new developer and an new to prefect. Currently we are trying to implement prefect for our ETLS at my job. We use vmware and windows sever, we found out the hard way that using prefect with nested virtualization does not work well. So now are moving to prefect cloud, using local agents. Will this work? Is it necessary to run multiple agents for multiple flows or can one agent run multiple flows?
    k
    17 replies · 2 participants
  • m

    Milton

    03/18/2022, 10:01 PM
    Hi, in prefect 0.14.11, prefect execute flow-run downloads the necessary local dependencies from the repo when using GitLab storage but this behavior seems to have changed in at least 1.0.0…could you help remind me of when this was changed? Also does this mean that we have to build a docker image for each repo we have? :(
    k
    a
    83 replies · 3 participants
  • m

    Michał Augoff

    03/18/2022, 10:28 PM
    does configuring a flow with
    KubernetesRun
    restrict the flow only to kubernetes agents or can it still be picked up by any agent as long as the labels match? I was under this impression after reading the docs but when I created 1 docker agent and 1 k8s agent with the same set of labels, my kubernetes flow got picked up by the docker agent
    k
    6 replies · 2 participants
  • r

    Royzac

    03/18/2022, 11:40 PM
    Has anyone used Great Expectations, how does it compare to other testing frameworks?
    k
    a
    +1
    7 replies · 4 participants
  • s

    Serge Tarkovski

    03/19/2022, 12:34 PM
    Hi all! I followed the "Running flows in Docker" tutorial for 2.0 (
    prefect==2.0b2
    , Python 3.9, Linux): • created an S3 bucket and a storage (isn't it too much for a local run?) • started a local API server and configured PREFECT_API_URL • created a queue • started an agent for that queue • created a deployment with DockerFlowRunner and no parameters Then I ran the deployment and the agent output wasn't very informative, see below. In the UI the flow run "dag-sample/caped-antelope" is shown as failed without any logs available. Anyone can explain?
    $ prefect agent start 6aa0c2b2-e895-4b8c-aed3-8bef5b2c88ab
    Starting agent connected to <http://127.0.0.1:4200/api>...
    
      ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
     | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
     |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
     |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|
    
    
    Agent started! Looking for work from queue '6aa0c2b2-e895-4b8c-aed3-8bef5b2c88ab'...
    13:29:31.446 | INFO    | prefect.agent - Submitting flow run '8eb8aa0b-82da-4081-9a42-aae1f22b0525'
    /home/tarkovskyi/miniconda3/envs/prefect_exp39/lib/python3.9/site-packages/prefect/flow_runners.py:697: UserWarning: `host.docker.internal` could not be automatically resolved to your local ip address. This feature is not supported on Docker Engine v19.3.15, upgrade to v20.10.0+ if you encounter issues.
      warnings.warn(
    13:29:32.215 | INFO    | prefect.flow_runner.docker - Flow run 'caped-antelope' has container settings = {'image': 'prefecthq/prefect:2.0b2-python3.9', 'network': None, 'network_mode': 'host', 'command': ['python', '-m', 'prefect.engine', '8eb8aa0b-82da-4081-9a42-aae1f22b0525'], 'environment': {'PREFECT_API_URL': '<http://127.0.0.1:4200/api>'}, 'auto_remove': False, 'labels': {'io.prefect.flow-run-id': '8eb8aa0b-82da-4081-9a42-aae1f22b0525'}, 'extra_hosts': {}, 'name': 'caped-antelope', 'volumes': []}
    13:29:33.547 | INFO    | prefect.agent - Completed submission of flow run '8eb8aa0b-82da-4081-9a42-aae1f22b0525'
    13:29:33.584 | INFO    | prefect.flow_runner.docker - Flow run container 'caped-antelope' has status 'running'
    13:29:44.923 | INFO    | prefect.flow_runner.docker - Flow run container 'caped-antelope' has status 'exited'
    a
    4 replies · 2 participants
  • t

    Taylor Harless

    03/19/2022, 5:29 PM
    I'm struggling to implement the GCP_CREDENTIALS default secret feature in a local Prefect flow. • I've set an environmental variable PREFECT__CONTEXT__SECRETS_GCP_CREDENTIALS pointing to a full local path for the GCP credentials .json downloaded from the GCP project to which I'd like to upload some files. • I've run the following:
    from prefect import Flow
    from prefect.tasks.gcp.storage import GCSUpload
    
    with Flow("google-cloud-test") as flow:
    
        GCSUpload(bucket="test-upload", create_bucket=True)(
            data="test-file.csv", credentials="GCP_CREDENTIALS"
        )
    
    flow.run()
    and received an error:
    AttributeError: 'str' object has no attribute 'keys'
    . I've incorporated the feedback from a similar error discussion recently, but can't figure out what the issue is. Any help is much appreciated.
    k
    a
    10 replies · 3 participants
  • o

    Omar Sultan

    03/20/2022, 9:27 AM
    Hi Everyone, we have a prefect server running on Kubernetes, setup was done using the HELM Chart. Everything is running smoothly but occasionally we would get this error
    File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 341, in _raise_timeout
        self, url, "Read timed out. (read timeout=%s)" % timeout_value
    urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Read timed out. (read timeout=15)
    This happens especially when we use the Task StartFlowRun it does not happen very often , but I was wondering if there was a way to force retry or if anyone knows why this would be happening? Thanks
    :discourse: 1
    a
    14 replies · 2 participants
  • t

    Tomer Cagan

    03/20/2022, 12:48 PM
    Is there a way to get logs from code running within dask? I know that for "general" use of dask, the logs would have to be pulled from the execution environment such as k8s (as noted here) When I am running code that uses
    worker_client
    and then submit new tasks to the same cluster (code inside), I see that printout from the first call of the function is logged in prefect and shipped to server, but consecutive calls are not (see log inside). Is there some trick I can use?
    a
    k
    17 replies · 3 participants
  • t

    Trip Humphrey

    03/21/2022, 3:15 AM
    Is there an example template of what a properly architected git repo looks like for a Prefect project? I am having trouble conceptualizing the development workflow. I am imagining a "production" branch that at any given moment contains a portfolio of all the relevant flows for a given project. Changes (i.e. merges) to the production branch would trigger incremental registrations of the modified flows driven by some sort of backend "build" job triggered by the successful merge. Other things I am considering would be a way to have proposed changes to production flows go through automated testing before being merged to production and ideally a way for developers to test their flows in development without sullying the production environment. Bonus points for being able to code the tasks and flows in such a way so that they do not require editing between environments. is there a recognized reference architecture for such a build strategy?
    👀 1
    t
    a
    +1
    6 replies · 4 participants
  • m

    Madhup Sukoon

    03/21/2022, 4:28 AM
    Hi, I want to de-register some flows programatically based on certain conditions. Is it possible to: 1. Get a list of all registered flows in python (i.e. the Python equivalent of
    prefect get flows
    ), and 2. De-register some of these flows through python.
    a
    2 replies · 2 participants
  • r

    Ryan Sattler

    03/21/2022, 5:51 AM
    Hi - is there an explanation of what exactly Workspaces in Prefect Cloud 2.0 do/are for? The docs I could find say how to create one but not really why you’d want to.
    a
    m
    3 replies · 3 participants
  • m

    moti shakuri

    03/21/2022, 11:04 AM
    hey everyone, looking to change our ETL flow from celery alone to prefect, and i came across this discussion on github: https://github.com/PrefectHQ/prefect/issues/1689 we are looking to have the easiest transformation into prefect and would like to have minimum changes, but i could not find a way to use a CeleryExecutor like Dask has. Am i missing something? is it supported or should i implement it on my own? Thank you 🙂
    a
    2 replies · 2 participants
  • a

    Andreas Nord

    03/21/2022, 1:32 PM
    Hi! What is the equivalent of using a json prefect secret in the cloud, if I want to do this in the local secret (.prefect/config.toml)?
    a
    4 replies · 2 participants
  • a

    Azer Rustamov

    03/21/2022, 2:05 PM
    Hi! How do I mount AWS credentials to
    KubernetesFlowRunner
    ?
    a
    4 replies · 2 participants
  • j

    Jared Robbins

    03/21/2022, 2:33 PM
    Lots of good fixes in 2.0b2
    :thank-you: 3
    :marvin: 4
  • j

    Jared Robbins

    03/21/2022, 2:44 PM
    https://orion-docs.prefect.io/concepts/tasks/#configuring-concurrency-limits On this page it says to use prefect concurrency_limit; however, the actual command is concurrency-limit
    🙏 2
    k
    m
    2 replies · 3 participants
  • j

    Jared Robbins

    03/21/2022, 3:10 PM
    Also, is it possible to limit concurrency at the deployment level? I am implementing a job that I don't want running twice at the same time. It seems tags in a DeploymentSpec don't work for concurrency limits. Right now I might move the tag to the main flow
    k
    a
    8 replies · 3 participants
Powered by Linen
Title
j

Jared Robbins

03/21/2022, 3:10 PM
Also, is it possible to limit concurrency at the deployment level? I am implementing a job that I don't want running twice at the same time. It seems tags in a DeploymentSpec don't work for concurrency limits. Right now I might move the tag to the main flow
k

Kevin Kho

03/21/2022, 3:26 PM
Tags should work on the deployment. I’ll try this in a bit
a

Anna Geller

03/21/2022, 3:48 PM
@Jared Robbins there are two levels of concurrency limits. #1 Currently, using the
prefect concurrency-limit
command, you can set task-level concurrency limits. 2# The alternative to task-level concurrency limits are work-queue concurrency limits which allow you to set limits on a deployment level - this seems to be exactly what you want! To create that, you can create a work-queue for a specific deployment UUID:
prefect work-queue create -d 'uuid' test_queue
this will return WORK_QUEUE_ID which you can use to set a limit for that to 1:
prefect work-queue set-concurrency-limit WORK_QUEUE_ID 1
more on that https://orion-docs.prefect.io/concepts/work-queues/
j

Jared Robbins

03/21/2022, 3:51 PM
I suppose that will work... will need a work queue and agent for every job though
@Kevin Kho Was that possible? I think it would be a really useful feature. I'm sure I'm not the only one who doesn
't want the same job to run twice
k

Kevin Kho

03/21/2022, 4:40 PM
That I know for sure will be a built-in feature where the current run will just wait for the previous run to finish. It just hasn;’t been developed yet
j

Jared Robbins

03/21/2022, 5:20 PM
Dang. I'll keep an eye on the changelogs. will put this on the backburner for now
View count: 3