https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • s

    Stef van Dijk

    06/02/2021, 10:12 AM
    Hello! I'm using Prefect Cloud and I like its features. I'm struggling with some security issues at the moment. Is it possible for an agent to be restricted to execute only flows that use local storage? Let's say I have a machine that should never execute harmful flows. Can I make sure that when someone accidentally gets my Prefect credentials, that person can never make the agent on my machine execute a flow that he just registered using my credentials? I want to make this restriction at agent level, because otherwise it seems to me that a hacker could always register a flow living in some github repo and make the agent execute it. Restricting the agent to only execute flows with local storage would prevent this from happening. I hope my question is clear, thanks in advance!
    n
    2 replies · 2 participants
  • s

    Snehotosh

    06/02/2021, 11:45 AM
    Can anyone please clarify the type of open source license of Prefect server? Is there any licensing impact using in production?
    a
    m
    2 replies · 3 participants
  • m

    Mckelle

    06/02/2021, 9:13 PM
    Hi all. I've been working on setting up Prefect Server in GCP and wanted to see if anyone had successfully secured it under Identity Aware Proxy? I've set it up on a VM and have created a load balancer that directs traffic from my domain to the port 8080 on the VM to access the UI. Initially, when connecting graphql (4200) to the UI, I tried to just set it to the VM IP address:4200, but that was denied because it is HTTP and the domain is https so chrome throws an error saying it's not secure. To fix that error, I then made a second backend service on the Load Balancer pointing to port 4200 and created a subdomain for that port. I set the subdomain as my Prefect Server Graphql endpoint in the UI and that resolved the first issue. However, the issue with this is that as soon as I turn on Identity Aware Proxy for both the UI and Graphql backend service, I get a CORS error.
    Access to manifest at '<https://accounts.google.com/o/oauth2/v2/auth?…>' (redirected from '<http://maindomain.com|maindomain.com>') from origin '<http://maindomain.com|maindomain.com>' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.
    Does anyone have any experience with this or anything similar?
    n
    j
    4 replies · 3 participants
  • c

    Chohang Ng

    06/03/2021, 1:56 AM
    flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
    flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
    flow_3_flow = StartFlowRun(flow_name='flow_3',project_name='tester',wait = True)
    flow_4_flow = StartFlowRun(flow_name='flow_4',project_name='tester',wait = True)
    flow_5_flow = StartFlowRun(flow_name='flow_5',project_name='tester',wait = True)
    
    
    with Flow("main-flow", schedule=weekday_schedule,executor=LocalExecutor(), 
               run_config=LocalRun()) as flow:
        flow_3 = flow_3_flow()
        flow_1_flow().set_upstream(flow_2_flow())
        step_2 = flow_4_flow(upstream_tasks = [flow_1_flow(),flow_3])
        step_3 = flow_5_flow(upstream_tasks= [step_2])
        
    flow.register(project_name='tester')
  • c

    Chohang Ng

    06/03/2021, 1:57 AM
    I am confused how the upstream_tasks work. Here I want to execute them in this order (2>(1,3)>4>5). Does this look right? For some reasons, only flow 1,2 got executed
    c
    2 replies · 2 participants
  • c

    Chohang Ng

    06/03/2021, 4:25 PM
    if I include LocalAgent().start() under where my flow is registered in code, will the cloud starts up a agent to look for flows when it is scheduled to run?
    n
    3 replies · 2 participants
  • i

    Ismail Cenik

    06/03/2021, 5:34 PM
    Hello, our flows have been successfully running for 1 month and made more than 200 successful runs. A new error has just started yesterday. For the last 6 runs, half of them were not completed and stayed in one of our tasks. There is no log information. According to the task logic, the Kinesis Data Analytics must be checked per 60 seconds. After a while, just waiting without doing anything. There is still an active flow staying in that condition, and I have not killed it to show you. Could you please check it?
    k
    75 replies · 2 participants
  • c

    Chohang Ng

    06/03/2021, 8:18 PM
    Here the Drop_tmp was executed after the create tmp and thus the dependency was lost. Drop_tmp is supposed to execute last according to my flow. What am I missing? If I don't set upstream and downstream, it might execute out of order? But when I look at other flows, most of them are still fine.
    k
    s
    12 replies · 3 participants
  • c

    Chohang Ng

    06/03/2021, 8:44 PM
    Does the LocalDaskExecutor use all the cores of the local computer where the flows are registered? So it will find all the independent flows and execute them parallelly? vs LocalExecutor just uses one. Correct?
    k
    1 reply · 2 participants
  • d

    Daniel Davee

    06/03/2021, 9:39 PM
    I'm using a dask executer and kubernetes run, to run a flow on a kubenetes cluster. But it doesn’t seem to want to import my sub modules when it runs on the cluster. Do the I need to include them on the worker or is there a way they can be uploaded with flow?
    k
    n
    27 replies · 3 participants
  • j

    Joël Luijmes

    06/04/2021, 8:43 AM
    Hey there! Is the newly introduced KV Store also coming to Prefect Server? Or will this be cloud-only feature?
    s
    n
    11 replies · 3 participants
  • j

    Johan Wåhlin

    06/04/2021, 11:32 AM
    Does anyone have experience with registering prefect flows from azure devops pipelines to a prefect server? The command: 'prefect register flow --file flows/test_flow.py --project batteriskip --skip-if-flow-metadata-unchanged' keeps getting the error "requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200)" despite setting environment variable PREFECT__SERVER__ENDPOINT="http://myserverip:myport'" Same issue when using variables PREFECT__SERVER__HOST and PREFECT__SERVER__POST.
    n
    2 replies · 2 participants
  • l

    ll

    06/04/2021, 1:43 PM
    Are there any plans to add another type of executor for distributed environments? Our use case requires a large batch of tasks. Each task just makes an invocation of a C++ executable in an embarrassingly parallel fashion. i.e. If done serially by hand:
    # task 1
    ./my_cpp_executable <http://file1.xyz|file1.xyz>
    
    ...
    
    # task 1000
    ./my_cpp_executable <http://file1000.xyz|file1000.xyz>
    Each task takes about 4-8 compute hours on 4 CPUs/32G~ memory and our scheduled workloads take up about 20,000-40,000+ compute hours per day. From what I can tell the only supported strategy for running a large batch of embarrassingly parallel tasks right now is to use Dask. We have it working but I feel Dask is more oriented to (i) interactive analysis workloads, (ii) pure Python tasks, (iii) small jobs that fit onto local disk for each Dask node. Feels awkward to invoke a Dask executor for a one-line shell execution for a high-throughput, long-running, queued (num_tasks >> num_cluster_nodes) workload. We prefer not to have to support Dask on our infrastructure as it adds a whole other set of things that our sysengs have to maintain. Seems more suitable if you supported any job queueing systems typically found in HPC environments like SGE, Slurm or HTCondor. I figure many of your target users in the fintech, scientific computing, meteorological space will already have SGE or Mesos cluster set up in their environment, but not a Dask cluster.
    s
    11 replies · 2 participants
  • m

    Michael Brown

    06/04/2021, 3:23 PM
    My agent is running but it seems like it is not picking up any flows. They are all behind schedule. I believe I configure my API and agent API correctly.
    n
    c
    9 replies · 3 participants
  • g

    Garret Cook

    06/04/2021, 8:01 PM
    I’m running flows with a docker agent, what’s the easiest way to store intermediate task results, <10kb csv/json file per task. What’s the best result type to store those for retries?
    k
    2 replies · 2 participants
  • t

    Tomás Emilio Silva Ebensperger

    06/05/2021, 2:39 PM
    Hello one question. I set up a 1800 second timeout for a task, but it got stuck and didnt finish after that time out, it kept on waiting for hours and hours.
    @task(log_stdout=True, state_handlers=[handler], timeout=1800)
    k
    m
    21 replies · 3 participants
  • c

    Colin

    06/05/2021, 3:48 PM
    Hi, i am running kubernetes using docker storage and kubernetes hosting. sometime, just sometimes my flows dont start at the scheduled time - they start 15 minutes later and work succesfully. when i look at the logs i can see that they are not submitted for execution at the required time but about 15 mins later ? can anyone help me find the logs ?
    k
    78 replies · 2 participants
  • k

    Kamil Gorszczyk

    06/05/2021, 9:57 PM
    Hi everyone. Is there a way to get a list of all registered flow names?
    v
    j
    5 replies · 3 participants
  • r

    Robert Hales

    06/07/2021, 9:02 AM
    Hi there having a slight issue with the ECS agent, StartFlowRun, and running in parallel. As the ECS agent registers and deregisters a task definition every time it starts a flow, when starting multiple flows in parallel I get the following AWS error:
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family."
    k
    4 replies · 2 participants
  • f

    Fabrice Toussaint

    06/07/2021, 11:50 AM
    Hi, I am trying to setup a Dask Executor but when try to deploy a flow using the DaskExecutor it says that Prefect cannot be found:
    Unexpected error: ModuleNotFoundError("No module named 'prefect'")
    I did specify Prefect in the environment variables of the pod specification, so I do not know why it cannot be found. If anyone can help me, please let me know 🙂. EDIT: Also I am using the KubeCluster class (dask_kubernetes.KubeCluster)
    j
    k
    2 replies · 3 participants
  • u

    ...disabled account...

    06/07/2021, 4:36 PM
    How do teams manage failover to another region/datacenter with prefect? For AWS, the registered flows seem to be regionalized via S3 bucket. And the bucket name is hardcoded on creation, so you can’t just fail over to another region and use a different bucket (that gets the data automatically synchronized) Looking at the storage options I only see webhook as option (which can use replicated S3 under the hood).
    k
    5 replies · 2 participants
  • d

    Daniel Davee

    06/07/2021, 7:41 PM
    Im running a kubernetes agent on GKE and trying to connect to a VM in GCP and I am getting this error 405 Client Error: Not Allowed for url: what should be the default value for PREFECT__CLOUD__API
    k
    n
    +1
    18 replies · 4 participants
  • k

    Kao Phetchareun

    06/07/2021, 7:59 PM
    I’m having some trouble with Email Task and the Cloud Secrets. We recently upgraded our Prefect local agent to the current version, and now are getting these the Local Secret not found message for the email creds when initiating a flow run from the cloud (screenshot attached). However, when we run the flow locally from python, Prefect is able to get the email creds that are stored in Prefect Cloud (this is assumed since we never stored the email creds anywhere but the cloud). Any ideas? cc: @jack
    j
    j
    18 replies · 3 participants
  • d

    Damien Ramunno-Johnson

    06/07/2021, 11:26 PM
    I have a quick question, with Cloud. Is the traffic from the endpoint to the agent one way? Like does the agent just check the endpoint every 10 seconds, and then tells the endpoint that it is running the job?
    k
    5 replies · 2 participants
  • l

    Lukas N.

    06/08/2021, 5:03 PM
    Hello 👋 , I've hit an issue while retrying mapped task. It always re-runs the first mapped task. Code and more details in thread
    j
    m
    17 replies · 3 participants
  • g

    Garret Cook

    06/08/2021, 10:05 PM
    Will a ShellTask save its return value into the flow’s S3Result? If I have return_all=True set for the shell task, will all the logs get pickled and saved to S3?
    m
    3 replies · 2 participants
  • r

    Raed

    06/09/2021, 7:15 AM
    Hello, what would be the best way to pass a bitbucket access token to prefect-sever? (I.e. Set a secret in prefect-server) I setup the server using the instructions in the most recent

    stream▾

    When trying the following for a given flow
    flow.run_configs = KubernetesRun(
        image=<image>,
        env={
            "PREFECT__CONTEXT__SECRETS__BITBUCKET_ACCESS_TOKEN": os.environ[
                "BITBUCKET_ACCESS_TOKEN"
            ]
        },
        image_pull_policy="Always",
    )
    flow.storage = Bitbucket(
        project=<project>,
        repo=<repo>,
        path="flows/example_flow.py",
        access_token_secret="BITBUCKET_ACCESS_TOKEN",
    )
    I get the following error on the UI
    Failed to load and execute Flow's environment: ValueError('Local Secret "BITBUCKET_ACCESS_TOKEN" was not found.')
    Wouldn't the access token secret have been set in the run config? I also have the same environment variable in a custom docker image being used by the run config
    k
    48 replies · 2 participants
  • f

    Florian Kühnlenz

    06/09/2021, 1:41 PM
    Hi. I have an issue that when I change the default parameters in the settings page of the UI, the changes do not apply to already scheduled flows. I believe there was a fix for this issue in the past, but maybe I misremember. Anyway, is this expected behavior or should I file a bug?
    k
    6 replies · 2 participants
  • k

    Krapi Shah

    06/10/2021, 5:04 PM
    Hi. Prefect change task state as failed only in case of error or exception from what I understand. But what if i want to set the state based on the return value? Can I do that?
    j
    2 replies · 2 participants
  • g

    Garret Cook

    06/10/2021, 7:47 PM
    How do I get the flow name at runtime I was trying prefect.context.get(‘flow_name’)
    k
    m
    18 replies · 3 participants
Powered by Linen
Title
g

Garret Cook

06/10/2021, 7:47 PM
How do I get the flow name at runtime I was trying prefect.context.get(‘flow_name’)
k

Kevin Kho

06/10/2021, 7:48 PM
Hi @Garret Cook, did you try
prefect.context.flow_name
?
g

Garret Cook

06/10/2021, 7:49 PM
well, that seems obvious, lemme try that
Traceback (most recent call last): File “client_backup_enrollment.py”, line 160, in <module> key=“{flow_run_name}/report.html”.format(flow_run_name=prefect.context.flow_name), AttributeError: ‘Context’ object has no attribute ’flow_name
k

Kevin Kho

06/10/2021, 7:50 PM
Oh ok I’ll try this
m

Michael Adkins

06/10/2021, 7:53 PM
Are you looking for the
flow_run_name
or the
flow_name
?
👍 1
g

Garret Cook

06/10/2021, 7:54 PM
oh, I’d the flow_run_name I think, the dynamically generated one, like ‘exhausted-mongoose’
m

Michael Adkins

06/10/2021, 7:54 PM
It should be available under that key then 🙂
g

Garret Cook

06/10/2021, 7:54 PM
prefect.context.flow_run_name. << that one?
m

Michael Adkins

06/10/2021, 7:55 PM
Yep!
g

Garret Cook

06/10/2021, 7:56 PM
key=“{flow_run_name}/report.html”.format(flow_run_name=prefect.context.flow_run_name) yields: [root@prefect production-flows]# python3.6 client_backup_enrollment.py Traceback (most recent call last): File “client_backup_enrollment.py”, line 160, in <module> key=“{flow_run_name}/report.html”.format(flow_run_name=prefect.context.flow_run_name), AttributeError: ‘Context’ object has no attribute ‘flow_run_name’
m

Michael Adkins

06/10/2021, 7:56 PM
When are you calling this?
If the flow run isn't running at the time, it won't exist yet
g

Garret Cook

06/10/2021, 7:57 PM
I thought perhaps, but why doesn’t the call get deferred then? Do I need to template that string differently?
m

Michael Adkins

06/10/2021, 7:58 PM
If you call it from within a task it'll be deferred but if you call it elsewhere it may not
Can you share a little more context of where you're setting this?
g

Garret Cook

06/10/2021, 7:58 PM
ah, I bet that is the issue
m

Michael Adkins

06/10/2021, 7:58 PM
https://docs.prefect.io/core/concepts/templating.html#where-can-i-use-templating may be useful
g

Garret Cook

06/10/2021, 7:59 PM
Lemme try that
View count: 2