https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Suresh R

    07/11/2022, 10:07 AM
    Hi !When i am registering same flow which is stored in Git from different system version gets updated, but in the same system it gets skipped. Why this happens, do we have option to check what changed which creates new version?
    ✅ 1
    a
    1 reply · 2 participants
  • i

    Ievgenii Martynenko

    07/11/2022, 10:14 AM
    Hello, is there an option not to store versions in prefect at all? We have git version insread.
    ✅ 1
    a
    7 replies · 2 participants
  • i

    Ievgenii Martynenko

    07/11/2022, 10:30 AM
    Hello, also few questions about retention and maintenance (non Cloud): 1) Which table in DB do you recommend to maintain, any purge plans? 2) Does any service in K8s allow to scale? I.e. we have agent, hasura, apollo, graphql, towel and ui each having single pod in k8s. What if we add 1 more graphql pod for example? 3) Can we control job run history retention period?
    ✅ 1
    a
    1 reply · 2 participants
  • b

    Bogdan Serban

    07/11/2022, 10:51 AM
    Hello everyone! I am building a flow where inference on a pytorch model is mapped on a series of images which are read from the disk. Currently, I am loading the model within a dedicated task and passing it as an argument to the downstream
    map
    function in an
    unmapped
    call. I am receiving an warning related to the size of the ml model that is being shared in the task graph (more details in the thread). Is there an issue if I am receiving that error message? And is there a better way to share that ML model across the tasks?
    ✅ 1
    a
    12 replies · 2 participants
  • m

    Mathieu Cayssol

    07/11/2022, 11:20 AM
    Hey guys, I'm totally new to prefect. I'm trying to use it, but just after pip install prefect in my conda environnement, I have a module import error.
    ✅ 1
    a
    2 replies · 2 participants
  • j

    Jan

    07/11/2022, 11:56 AM
    Hi community, I seem to be completely misunderstanding the principle of result within flows. My python code below does not print "just a string" to my prompt. If I check the vars i'm able to disect it down to mijn_flow.result which gives me a uniterable object.
    from prefect import task, flow
    import time
    @flow
    def mijn_flow ():
        return("just a string")
    
    print(mijn_flow())
    The output is: 13:54:51.255 | INFO | prefect.engine - Created flow run 'brainy-parakeet' for flow 'mijn-flow' 13:54:51.271 | INFO | Flow run 'brainy-parakeet' - Using task runner 'ConcurrentTaskRunner' 13:54:51.302 | WARNING | Flow run 'brainy-parakeet' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment. 13:54:51.443 | INFO | Flow run 'brainy-parakeet' - Finished in state Completed() Completed() Can you point me into the correct direction? I'm trying to access the return object of my flow (which is a string in this case).
    ✅ 1
    a
    8 replies · 2 participants
  • a

    Alexandru Anghel

    07/11/2022, 12:58 PM
    Hello, I am trying to run flows on a Kubernetes private cluster using the Dask executor and GCS storage. I am coming from a GKE test cluster where i was able to run flows using this approach. The thing now is that the private cluster runs behind a corporate proxy. I've set up the HTTPS_PROXY env variable inside KubernetesRun job template and Prefect is able to download the flow metadata from GCS. The problem is that the same pod is creating the Dask cluster and it fails with this error:
    RuntimeError(f"Cluster failed to start: {e}") from e RuntimeError: Cluster failed to start: 503, message='Service Unavailable', url=URL('<http://proxy-ip-here>:proxy-port-here')
    Any ideas on how to fix this? I've tried adding a NO_PROXY env variable alongside HTTPS_PROXY but it doesn't work. I am using Prefect 1.2 Thanks!
    👀 1
    k
    3 replies · 2 participants
  • b

    Black Spy

    07/11/2022, 1:06 PM
    with Flow("hello-world"): task1 = hello_task() task2 = test(task1) with Flow("Multiple Flow"): db_table=create_table() raw=get_complete_date() parsed=parse_complaint_data(raw) populated_table= store_complaints(parsed) populated_table.set_upstream(db_table) from prefect import Flow from prefect.tasks.prefect import StartFlowRun data_engineering_flow = StartFlowRun(flow_name="hello-world", project_name='ETL - Prefect', wait=True) data_science_flow = StartFlowRun(flow_name="Multiple Flow", project_name='ETL - Prefect', wait=True) with Flow("main-flow") as flow: result = data_science_flow(upstream_tasks=[data_engineering_flow]) flow.run() flow.register(project_name="ETL - Prefect") I am facing some issues in flow after executing the code we can able to see flow in UI but We couldn't see task results and also its taking lots of time for processing.. can anyone help me out this
    k
    5 replies · 2 participants
  • b

    Binoy Shah

    07/11/2022, 1:42 PM
    Background Existing Infrastructure: • We do have full fledged Kubernetes clusters running on top of AWS/EKS. • We have stable Jenkins CI/CD pipelines to build/deploy Docker Images and Helm Charts • We support multiple environments separated at namespace level • Our observability is via NewRelic • Our Credential Stores are wired very well in Helm chart via custom charts+annotations • Data Warehouse is Snowflake and data sources are plethora of DBs and API services • ELT is carried out by Meltano + DBT or Python + Celery • Everything running on Kubernetes I am future user of a workflow engine, I am asking around for evaluations, I constructed this so far. I have to confess spent more time at
    #dagster-*
    channels compared to other communities and and it shows in the recommendations chart below under the light of our existing infra setup. But I wanted to add more “fairness” to the evaluation ratings and I’d highly appreciate some constructive feedback from the community on where I can improve this rating.
    👀 1
    k
    t
    11 replies · 3 participants
  • f

    Florian Guily

    07/11/2022, 1:55 PM
    hey, i have a flow that is always register even when there is no change to the code. Any idea on why this is hapenning ?
    k
    4 replies · 2 participants
  • j

    Joshua Greenhalgh

    07/11/2022, 2:06 PM
    @Kevin Kho Thanks for all your help! Good luck with whatever you are doing next!
    :upvote: 1
    k
    1 reply · 2 participants
  • o

    Octopus

    07/11/2022, 2:14 PM
    [v1.2.1] Hi I would like to run multiple sub flows from task (e.g for each ref I would like to run each action "read", "write","delete") . I have the parent flow who'll have the basic data and the child flow who'll execute an action on a ref. With my code I can only trigger 3 subflows / 9 . I think its because I use the StartFlowRun (I get the same behavior with create_flow_run ) because if I call a task instead of startflowrun I get my 9 subflow executed.
    from prefect import Flow, Parameter, task, unmapped
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    from prefect.executors import LocalDaskExecutor
    
    from prefect.tasks.prefect import StartFlowRun
    
    import time
    from datetime import timedelta
    
    # @task
    # def wait_and_succeed(ref, action_id):
    #     time.sleep(10)
    
    #     print(f"children task success for ref {ref} and action {action_id}")
    
    #     if action_id == "write":
    #         print(f"[SUCCESS] {ref} Second level reached !!!")
    #     if action_id == "delete":
    #         print(f"[SUCCESS] {ref} Third level reached !!!")
    
    @task
    def call_children_flow(ref):
        print(f"{ref} ref")
    
        actions_id = ["read","write","delete"]
    
        for action_id in actions_id:
            start_flow_run = StartFlowRun(flow_name="Generic Children flow")
            print(f"start_flow_run {start_flow_run}")
    
            child_id = start_flow_run.run(parameters={
                    "reference": ref,
                    "action_id": action_id,
            }, project_name="Playground")
    
            wait_for_flow_run.run(child_id)
    
    
    @task
    def run_action(action_id, ref):
        start_flow_run = StartFlowRun(flow_name="Generic Children flow")
    
        print(f"start_flow_run {start_flow_run}")
    
        child_id = start_flow_run.run(parameters={
                "reference": ref,
                "action_id": action_id,
        }, project_name="Playground")
    
        return child_id
    
    
    with Flow("Generic Parent flow") as parent_flow:
        fake_refs = ["ref1", "ref2", "ref3"]
        call_children_flow.map(fake_refs)
    
    if __name__ == "__main__":
        parent_flow.register(
            project_name="Playground"
        )
    
        parent_flow.executor = LocalDaskExecutor(num_workers=20)
    
        parent_flow.run()
    k
    14 replies · 2 participants
  • i

    Ievgenii Martynenko

    07/11/2022, 2:16 PM
    Did any noticed how many db session prefect 1.0 has at a point of time? I spot it creates sessions per every request leading to enormous numbers 100-200+ active session.
    ✅ 1
    k
    a
    4 replies · 3 participants
  • a

    Abin Joseph

    07/11/2022, 2:22 PM
    When would be stable release of prefect 2?
    k
    5 replies · 2 participants
  • m

    Madison Schott

    07/11/2022, 2:40 PM
    Hi all, for some reason my pipeline has been failing for a few days, but I never received an alert on this in our Slack channel- any ideas why this would be? I want to make sure this doesn't happen going forward
    k
    7 replies · 2 participants
  • j

    Jehan Abduljabbar

    07/11/2022, 3:37 PM
    What is the docker build command for the Dockerfile at https://github.com/PrefectHQ/prefect/blob/master/Dockerfile
    k
    4 replies · 2 participants
  • m

    Mars

    07/11/2022, 4:06 PM
    Hi all, is the Prefect Context object intended to be used for all pipeline configuration, including bespoke settings like bespoke API endpoints, environment names, and such? The examples in the Context Concept docs don’t make it clear if the context object is just for the Prefect framework’s configuration, or for my pipeline’s custom configuration too.
    m
    1 reply · 2 participants
  • a

    Amol Shirke

    07/11/2022, 5:38 PM
    Hello, I have prefect server behind load balancer interface. Deployment are created successfully with CLI but those don't. Show up on UI. Here is how I am running prefect orion. Prefect version: 2.0b8 $prefect orion start --host 0.0.0.0 --port 8065 $ set config: prefect config set http://0.0.0.0:8065/apiapi To access that host address is like http://hostname:some_random_portport UI is getting loaded but deployment is blank. CLI shows it. I tried setting up config to hostname with random port as well but didn't work. Any thoughts? Thanks
    m
    8 replies · 2 participants
  • m

    Michał Augoff

    07/11/2022, 6:00 PM
    hi all, 2.0 question, is it possible to set certain k8s job properties at the agent level (e.g. service account, namespace) as it is possible in 1.0 or everything needs to be configured via
    Deployment
    ’s KubernetesFlowRunner properties?
    r
    k
    5 replies · 3 participants
  • j

    Joe Goldbeck

    07/11/2022, 7:15 PM
    Hi all: Using Prefect 1.x, is it possible to have timeout/expiration for late flow runs? We run an export job every 15 minutes, so if our agent goes down, these can accumulate rather quickly. When the agent comes back up, we do not want to go back and run all the missed scheduled runs, we just want to pick back up from wherever it is.
    k
    m
    17 replies · 3 participants
  • c

    Connor Parish

    07/11/2022, 7:43 PM
    Hi all, I am trying to run a sub-flow on a different docker image than my main flow and pass data back from my sub-flow to my main flow using
    prefect 2.0b7-python3.8
    . I'm trying to orchestrate the images through
    DeploymentSpecs
    with
    DockerFlowRunners
    as the flow_runners. Currently what happens is the sub flow runs on the same image as the main flow unless the sub flow is deployed independently. Would greatly appreciate any ideas or insights into current feasibility!
    ✅ 1
    a
    k
    4 replies · 3 participants
  • v

    Vaikath Job

    07/11/2022, 8:29 PM
    Hi, I'm trying to attach storage to my flows (GitLab with on-prem host). Prefect server is hosted on an on-prem K8s cluster. I get the following error when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section like this [context.secrets] section with my token stored in a variable named GITLAB="<OAuth Token>"
  • f

    flurven

    07/11/2022, 8:34 PM
    Getting error when importing library prefect_gcp. On prefect 2.0. NameError: name 'SecretManagerServiceClient' is not defined.
    a
    a
    6 replies · 3 participants
  • v

    Vaikath Job

    07/11/2022, 8:36 PM
    Hi, I'm trying to attach storage to my flows (GitLab with on-prem host). Prefect server (1.x) is hosted on an on-prem K8s cluster. I get the following error
    Failed to load and execute flow run: ValueError('Local Secret "<prefect.client.secrets.Secret object at 0x00000221D3B7B100>" was not found.')
    when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section:
    [context.secrets]
    GITLAB="<OAuth Token>"
    The code that registers the flow is similar to this:
    secret = Secret("GITLAB")
    flow.storage = GitLab(host="path/to/host", repo="repo/address", path="flow/sample_flow.py", access_token_secret=secret)
    flow.register(project_name="test-project-name")
    I assume this is happening because the config.toml is not on the K8s cluster. If this is the case, is there a way I can attach this storage to the flow without storing OAuth tokens on the cluster itself?
    k
    3 replies · 2 participants
  • k

    Kevin Grismore

    07/11/2022, 9:05 PM
    Every time I run a flow, a weird cloudpickle-encoded blob JSON gets uploaded to my GCS flow storage bucket. 🤔 Is that supposed to happen? I thought my storage Block was just for when I'm deploying a flow or reading it from an Agent, but I probably misunderstood what else goes in there.
    ✅ 1
    k
    4 replies · 2 participants
  • m

    Mars

    07/11/2022, 9:09 PM
    Hi, is there an easy way to use
    .env
    files to load secrets from
    os.environ
    after
    prefect
    module import time? I want to use a
    PrefectSecret
    instead of an
    EnvVarSecret
    in my code, and I don’t want to hack the code between Prefect/EnvVar for local dev. Local context secrets should work well for overriding the PrefectSecret values, but it’s not working the way I expect. My debugger is telling me the context and secrets are set once, during
    import prefect
    , which means the secrets are fixed before I can load a dotenv file using my library of choice. The following pseudocode doesn’t work:
    import prefect
    import environs  # .env support. .env not loaded yet.
    
    with Flow() as flow:
      PrefectSecret("MY_SECRET")
    
    if __name__ == "__main__":
      # For local testing
      env = environs.Env()
      env.read_env(".env")  # Load my custom PREFECT__CONTEXT__SECRETS into os.environ
      flow.run()  # Ignores new os.environ
    ✅ 1
    k
    1 reply · 2 participants
  • l

    Laxman Singh Tomar

    07/12/2022, 5:01 AM
    Hello everyone. I have multiple microservices/projects (look at attached image) for use cases like Q&A generation, Search, Data Ingestion, etc. If we were to provide devs the ability to combine these individual components to stitch together as a service, would Prefect be of help here?
    a
    3 replies · 2 participants
  • a

    Andreas Nigg

    07/12/2022, 8:41 AM
    Hi all, I've encountered a prefect 2.0 (cloud) problem: I've a simple a flow which has a single task which looks as follows:
    @task(name="get_subscriptions",
    retries=2,
    retry_delay_seconds=5)
    def get_subscriptions(paper_code, logger: Logger):
        response = requests.get("my_url")
        return response
    The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception. The get request in the task takes about 1 minute and 10 seconds to return. The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect. Exception summary: • requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out')) • followed by: 10:36:55.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out. Exception details: See in thread Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute? Edit: I run the flow currently only locally by running "python name_of_script.py" Edit2: I'm running the python env in WSL2 Edit3: I use GCS storage as my default storage. Maybe this causes the problem? Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute. I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
    @task(name="get_subscriptions",
    retries=2,
    retry_delay_seconds=5)
    def get_subscriptions(paper_code, logger: Logger):
        response = requests.get("my_url")
        return zlib.compress(response.content)
    ✅ 1
    a
    m
    4 replies · 3 participants
  • e

    Emil Østergaard

    07/12/2022, 10:01 AM
    Hello, I have problems with prefect cloud 2.0. We use kubernetes flow runner, and a dask task runner. Friday (8/7-2022), I had a flow run which I wanted to abort. I attempted to use the
    delete
    functionality in the UI, thinking it would delete all resources related to the flow_run, including the kubernetes job etc. It did not remove the kubernetes job, so I removed this manually. The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit. It appears the task data associated with the deleted flow run was not removed from prefect storage. For instance, if I try:
    prefect concurrency-limit inspect my-tag
    It shows a bunch of active task ids, even though nothing is running in k8s. This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks, because prefect thinks the concurrency-limit is hit, due to these zombie tasks. However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead. Any help is appreciated!
    ✅ 1
    a
    m
    6 replies · 3 participants
  • s

    Slackbot

    07/12/2022, 10:16 AM
    This message was deleted.
Powered by Linen
Title
s

Slackbot

07/12/2022, 10:16 AM
This message was deleted.
View count: 1