• v

    Vaikath Job

    2 months ago
    Hi, I'm trying to attach storage to my flows (GitLab with on-prem host). Prefect server (1.x) is hosted on an on-prem K8s cluster. I get the following error
    Failed to load and execute flow run: ValueError('Local Secret "<prefect.client.secrets.Secret object at 0x00000221D3B7B100>" was not found.')
    when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section:
    [context.secrets]
    GITLAB="<OAuth Token>"
    The code that registers the flow is similar to this:
    secret = Secret("GITLAB")
    flow.storage = GitLab(host="path/to/host", repo="repo/address", path="flow/sample_flow.py", access_token_secret=secret)
    flow.register(project_name="test-project-name")
    I assume this is happening because the config.toml is not on the K8s cluster. If this is the case, is there a way I can attach this storage to the flow without storing OAuth tokens on the cluster itself?
    v
    Kevin Kho
    3 replies
    Copy to Clipboard
  • Kevin Grismore

    Kevin Grismore

    2 months ago
    Every time I run a flow, a weird cloudpickle-encoded blob JSON gets uploaded to my GCS flow storage bucket. 🤔 Is that supposed to happen? I thought my storage Block was just for when I'm deploying a flow or reading it from an Agent, but I probably misunderstood what else goes in there.
    Kevin Grismore
    Kevin Kho
    4 replies
    Copy to Clipboard
  • Mars

    Mars

    2 months ago
    Hi, is there an easy way to use
    .env
    files to load secrets from
    os.environ
    after
    prefect
    module import time? I want to use a
    PrefectSecret
    instead of an
    EnvVarSecret
    in my code, and I don’t want to hack the code between Prefect/EnvVar for local dev. Local context secrets should work well for overriding the PrefectSecret values, but it’s not working the way I expect. My debugger is telling me the context and secrets are set once, during
    import prefect
    , which means the secrets are fixed before I can load a dotenv file using my library of choice. The following pseudocode doesn’t work:
    import prefect
    import environs  # .env support. .env not loaded yet.
    
    with Flow() as flow:
      PrefectSecret("MY_SECRET")
    
    if __name__ == "__main__":
      # For local testing
      env = environs.Env()
      env.read_env(".env")  # Load my custom PREFECT__CONTEXT__SECRETS into os.environ
      flow.run()  # Ignores new os.environ
    Mars
    1 replies
    Copy to Clipboard
  • Laxman Singh Tomar

    Laxman Singh Tomar

    2 months ago
    Hello everyone. I have multiple microservices/projects (look at attached image) for use cases like Q&A generation, Search, Data Ingestion, etc. If we were to provide devs the ability to combine these individual components to stitch together as a service, would Prefect be of help here?
    Laxman Singh Tomar
    Anna Geller
    3 replies
    Copy to Clipboard
  • a

    Andreas Nigg

    2 months ago
    Hi all, I've encountered a prefect 2.0 (cloud) problem: I've a simple a flow which has a single task which looks as follows:
    @task(name="get_subscriptions",
    retries=2,
    retry_delay_seconds=5)
    def get_subscriptions(paper_code, logger: Logger):
        response = requests.get("my_url")
        return response
    The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception. The get request in the task takes about 1 minute and 10 seconds to return. The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect. Exception summary: • requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out')) followed by: 10:36:55.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out. Exception details: See in thread Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute? Edit: I run the flow currently only locally by running "python name_of_script.py" Edit2: I'm running the python env in WSL2 Edit3: I use GCS storage as my default storage. Maybe this causes the problem? Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute. I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
    @task(name="get_subscriptions",
    retries=2,
    retry_delay_seconds=5)
    def get_subscriptions(paper_code, logger: Logger):
        response = requests.get("my_url")
        return zlib.compress(response.content)
    a
    Anna Geller
    4 replies
    Copy to Clipboard
  • Emil Østergaard

    Emil Østergaard

    2 months ago
    Hello, I have problems with prefect cloud 2.0. We use kubernetes flow runner, and a dask task runner. Friday (8/7-2022), I had a flow run which I wanted to abort. I attempted to use the
    delete
    functionality in the UI, thinking it would delete all resources related to the flow_run, including the kubernetes job etc. It did not remove the kubernetes job, so I removed this manually. The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit. It appears the task data associated with the deleted flow run was not removed from prefect storage. For instance, if I try:
    prefect concurrency-limit inspect my-tag
    It shows a bunch of active task ids, even though nothing is running in k8s. This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks, because prefect thinks the concurrency-limit is hit, due to these zombie tasks. However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead. Any help is appreciated!
    Emil Østergaard
    Anna Geller
    +1
    6 replies
    Copy to Clipboard
  • s

    Slackbot

    2 months ago
    This message was deleted.
  • i

    iñigo

    2 months ago
    Hello, Is it possible to have a global view of all scripts? in the calendar view you are only seening the schedules for just one, But as I have one times a day scripts I'd love to see them chronologically to see how they are executing
    i
    Anna Geller
    2 replies
    Copy to Clipboard
  • i

    iñigo

    2 months ago
    Hello again, I was wondering if is possible or not, or if it is a good procedure tu create a flow that runs some flows inside. For example we have 3 flows that gather some data every night, what I do is to schedule every flow independtly and then space them one another. Will it be interesting to create a flow that manages all of these 3 flows? and I'll just have to schedule one? Thanks!
    i
    1 replies
    Copy to Clipboard
  • JK

    JK

    2 months ago
    Hello, is there a way to get a message inserted into a queue upon the
    flow run's success or failure
    , without using a task? Using a task is susceptible to infra issues (pre-emptible cloud compute nodes, etc) and hence will miss firing. Not sure if there’ a way to configure each flow to send a status into a AWS-SQS or GCS-PubSub? Or if not, have an HTTP api to poll for status for all of the flows in a tenant account? Thanks!
    JK
    Anna Geller
    6 replies
    Copy to Clipboard