https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • d

    Dennis Hinnenkamp

    03/23/2022, 10:00 AM
    Hi together, For our customer we have set up Prefect Server 1.1.0 on an EC2 instance. The customer would like to have a SSO with load balancer in front of it, but as soon as this is activated, the Prefect UI no longer connects to graphql, although this is started on the same instance and also with prefect server start -expose. Can anyone help here how best to implement this or why graphql does not connect? Thanks in advance!
    a
    6 replies · 2 participants
  • d

    David Charles

    03/23/2022, 11:23 AM
    Hi - we have the “mono-repo” layout - we intend to keep all our flows in a package and leverage shared stuff in a common package. We are able to register flows with our AWS hosted instance, but only a very simple “stand-alone” flow executes, we cannot get ones that reference common code to run, details follow in thread… Any help much appreciated!
    k
    k
    +1
    12 replies · 4 participants
  • l

    Lana Dann

    03/23/2022, 3:43 PM
    is it possible to use a LocalDaskExecutor with an ECS run config? i’m a bit confused on how i can leverage Dask and if it requires setting up any additional AWS IAM policies or resources to run in conjunction with my ECS run config. basically right now i have a complex flow with ~200 tasks and i’d like to increase the number of tasks that can be executed as one time.
    k
    19 replies · 2 participants
  • v

    Vaibhav Shetye

    03/24/2022, 5:40 AM
    Hi we are seeing double scheduling on prefect 0.15.3 Is there any easier way to identify all double schedules ?
    a
    1 reply · 2 participants
  • w

    Wieger Opmeer

    03/24/2022, 9:42 AM
    Hi, as far as I know Prefect Cloud is using Okta, is there a statement on the Okta Lapsus$ incident and if and how that impacts Prefect Cloud?
    a
    3 replies · 2 participants
  • t

    Tim Wright

    03/24/2022, 12:49 PM
    Does anyone know if there is a way to adjust the Lazarus process polling interval? We are running our flows in an AKS Cluster (Kubernetes) that is set to automatically scale out. I've noticed that during periods of high Azure traffic, it can take our Cluster some time for the scaled out nodes to actually show up. What I've observed is that a flow run will schedule and then trigger a scale-up event on the cluster. The next node will become available to schedule flows >10 minutes later. It appears that Lazarus is scheduling another flow run - and then when the scaled up node becomes available 2 versions of the flow (the original and the lazarus scheduled flow) are both running. Has anyone else run into this? Or does anyone have guidance on how we can handle it? Currently we've disabled the Lazarus process, ideally we could stretch the interval from 10 minutes.
    k
    9 replies · 2 participants
  • a

    Ari Hershowitz

    03/24/2022, 7:17 PM
    Hi all, I'm working on a project using document processing pipelines for a US government client. We need to deploy to Redhat servers, which (probably) means using podman instead of docker. My initial attempts to install Prefect with podman didn't work. Is there a good guide on Centos/Redhat Os?
    a
    m
    8 replies · 3 participants
  • m

    Mariusz Olszewski

    03/25/2022, 5:28 AM
    is there any change when it comes to security options on prefect server? any autehtifications?
    a
    2 replies · 2 participants
  • d

    Dennis Hinnenkamp

    03/25/2022, 7:02 AM
    Hi together, Is there a way to schedule a flow to start as soon as the previous run of the same flow finished?
    s
    a
    7 replies · 3 participants
  • n

    Niels Prins

    03/28/2022, 1:21 PM
    Hi all, Short question. I have a flow stored in GIT which I want to run on K8s. I have given the agent acces to the ENV variable PREFECT__CONTEXT__SECRETS__GITEA for my GIT key, which is populated via a secret in k8s. When running the flow I get the follwing message
    Failed to load and execute Flow's environment: ValueError('Local Secret "GITEA" was not found.')
    When trying the same thin with a docker agent, everything goes fine. What am I missing in my configuration. We are still on prefect 0.15.3
    k
    a
    7 replies · 3 participants
  • n

    Nikita Samoylov

    03/28/2022, 1:22 PM
    Hi there, we have a flow run by schedule frequently with different input params. All flows started by schedule have such names as
    groovy-woodpecker
    ,
    poetic-lorikeet
    etc. And it's really hard to navigate on these runs in the UI because to check input params you need to open every link in flow's
    RUNS
    tab. I would like to include input params in the flow names. Is there a way to define these names by myself instead of default ones? PS: We use Prefect v1.0.0
    k
    2 replies · 2 participants
  • c

    Christian Nuss

    03/28/2022, 5:37 PM
    hey everyone! i'm trying to add the Prefect Server Helm chart as a dependency to an existing helm chart. If I want to prefix/rename EVERYTHING that is in the Prefect Server helm chart, is there an easy way to do that?
    :discourse: 1
    m
    35 replies · 2 participants
  • e

    Egil Bugge

    03/29/2022, 12:58 PM
    Hi all! Wondering if anyone has had success with hosting Prefect Server and Prefect UI on Google Cloud App Engine? I know I could host them both on a Compute Engine VM, but if I could get them running on App Engine I could for example use IAP for authentication. Pretty new to both GCP and Docker really so there might be some obvious stuff I'm missing 🙂 Using Prefect Cloud is an option, but right now we're looking to keep as much of the project infrastructure in one place.
    k
    4 replies · 2 participants
  • c

    Chu Lục Ninh

    03/29/2022, 1:09 PM
    We have batch jobs written in other languages and packed into container and write our task mainly using
    RunKubernetesJob
    . But everytime I run the flow using
    KunernetesAgent
    , the agent always spin up new job, which in my case is useless and waste of resource. Since the flow is mainly about spin up new k8s job, I want
    KubernetesAgent
    run it directly. Please advise me the way to do that.
    k
    a
    23 replies · 3 participants
  • a

    Aniruddha Sengupta

    03/30/2022, 10:57 AM
    Hi all, I have a flow registered called
    news_url_extraction
    which is run in the UI with the arguments in the run configuration:
    lang
    ,
    region
    and
    root_dir
    . I want to be able to run this flow in another flow called
    news_pipeline
    and I have created the following code:
    # Set the working directory
    working_directory = general.home_dir() + settings.WORKING_DIR
    
    # Functions
    def execute_pipeline() -> None:
        with Flow("news_pipeline_flow",
              state_handlers=[prefect_utils.set_run_name]
            ) as flow:
    
            # Perform the news url extraction flow
            news_url_extraction_flow = create_flow_run(
                flow_name="news_url_extraction_flow", 
                project_name="scraper_pipeline",
                parameters={
                    "root_dir": os.environ.get("root_dir"),
                    "lang": os.environ.get("lang"),
                    "region": os.environ.get("region")
                    }
                )
            # wait_for_flow_news_url_extraction_flow = wait_for_flow_run(
            #     news_url_extraction_flow, 
            #     raise_final_state=True
            #     )
            
            # # Perform the news spider flow
            # news_spider_flow = create_flow_run(
            #     flow_name="news_spider_flow", 
            #     project_name="scraper_pipeline"
            #     )
            # wait_for_flow_news_spider_flow = wait_for_flow_run(
            #     news_spider_flow, 
            #     raise_final_state=True
            #     )
            
            news_url_extraction_flow
        
        return flow
    
    
    if __name__ == "__main__":
        """
        The main process to get the Simple News pipeline registered
        in the Prefect UI.
    
        """
        print("Creating flow")
        flow = execute_pipeline()
    
        # Register news pipeline parent flow
        print("Creating labels")
        labels = [general.retrieve_hostname()]
    
        print("Setting the configuration")
        env = {
            "root_dir": "the path where the results of the scraped urls should go",
            "lang": "the language param",
            "region": "the region param",
        }
        flow.run_config = LocalRun(working_dir=working_directory, labels=labels, env=env)
    
        project_name = "scraper_pipeline"
    
        print(f"Checking project {project_name} exists")
        project_existance = prefect_utils.check_project_exists(project_name)
    
        if not project_existance:
            print(f"Project {project_name} doesn't exist, creating now")
            client = Client()
            client.create_project(project_name=project_name)
    
        print("Registering the flow")
        flow.register(project_name=project_name)
    
        print(f"Update the agent labels for agent: {settings.PREFECT_AGENT_NAME}")
        prefect_utils.update_agent_labels(agent_name=settings.PREFECT_AGENT_NAME, new_labels=labels)
    When I run the child flow
    news_url_extraction
    the process completes within a few seconds. But when I run the parent flow
    news_pipeline
    the flow remains in a continuous running state. Is there anything I am missing here? I should also say that the reason why I am doing this in the first place is I have two flows I want to run:
    news_url_extraction
    and
    news_spiderl
    one after the other. At the moment, I have to manually register both flows then run them separately, but I want them to run them sequentially in one flow instead. The interesting thing to note here is that
    news_spider
    is not able to run on multithreads where as
    news_url_extraction
    is. So the two flows have slight different configurations, namely
    news_url_extraction
    uses
    LocalDaskExecutor
    but
    news_spider
    does not. Thanks and hope this all makes sense.
    a
    6 replies · 2 participants
  • m

    Michał Augoff

    03/30/2022, 1:43 PM
    Hi all! I see some inconsistencies in terms of task results when using S3 storage (script based) vs Docker storage and wanted to double check if I’m missing something here. My flow looks roughly like this:
    def get_flow(...):
      with Flow(...) as f:
        tasks
      return f
    then I use it in another file for deployment and set some env-specific configuration:
    from module_x import get_flow
    
    f = get_flow()
    f.result = S3Result(bucket=dev or prod bucket here)
    
    # no results with this storage configuration
    f.storage = S3(bucket & key, stored_as_script=True, local_script_path=<this file>)
    
    # results work with this storage
    f.storage = Docker(...)
    
    f.register()
    With S3 storage I don’t get any results saved in S3, everything works fine when using Docker storage. I wonder if this has anything to do with Docker using serialization and S3 using the “as script” approach
    a
    k
    10 replies · 3 participants
  • p

    Patrick Wyatt

    03/30/2022, 3:39 PM
    This is a pretty minor problem as these things go, but it would be great if there was a way to disable the ASCII logo in Prefect Agent and Prefect Server, since it pollutes our server logs in DataDog. I can go hack the code, in Prefect, but it seems inelegant to have to re-apply such code after every version upgrade. Thanks!
    k
    m
    4 replies · 3 participants
  • t

    Thomas Opsomer

    03/30/2022, 5:26 PM
    Hello 🙂 For a week or two, every time a flow reach a step that requires manual validation, it gets marked as failed after 10 minutes with a message "A Lazarus process attempted...". We manage to continue the flows by restarting them after the validation, but it's a bit annoying ^^
    k
    z
    6 replies · 3 participants
  • m

    Michał

    03/31/2022, 2:54 PM
    Hey I made some updates in ./prefect/config.toml how to reload changes? For example I added slack_web_hook url
    k
    3 replies · 2 participants
  • c

    Christian Nuss

    03/31/2022, 4:40 PM
    heyo! does there exist a single docker image with all of the prefect-server components and started using something like supervisord?
    k
    a
    5 replies · 3 participants
  • e

    Egil Bugge

    04/01/2022, 9:58 AM
    Hey all! I tested Tyler's Terraform script, https://gist.github.com/TylerWanner/0b4b00f4701dae6ad0a98978efe01966, for setting up Prefect Server in a Compute Engine VM and I'm having real problems getting the startup script to run correctly. I changed the config.toml so that it uses "apollo_url", so no worries with that. Seems to be some problems with getting Docker and Docker-Compose up and running. Anyone tested this recently and got it working?
    a
    6 replies · 2 participants
  • m

    Michael Smith

    04/01/2022, 10:06 AM
    Morning, I'm looking into Prefect 1 and running into a few startup issues, is there a guide on how to resolve as this must be a common problem?
    a
    19 replies · 2 participants
  • s

    Shuchita Tripathi

    04/01/2022, 3:49 PM
    I want to add azure storage for my flows. but i got an error - No module named azure. So, I am trying to run - pip install prefect[azure] -> but this is taking too long. it is running from about an hour. Is there anything I can do to get this resolved?
    k
    a
    53 replies · 3 participants
  • j

    Jullian Bellino

    04/04/2022, 8:38 AM
    Hello all, I'm quite new to Prefect and I'm looking for the best way to trigger a task based on external event. In my case the external event is a new S3 partition available and ready to be processed. To achieve that I've tried to rely on RETRY signal. Basically I defined a task in charge of checking the partition and raising the following signal in case it's not available yet:
    raise RETRY("Partition not available yet, retrying in 1 hour.", start_time=pendulum.now().add(hours=1))
    However it looks like this could block the other running tasks because the runner is not released. Now let's imagine we only rely on a single local agent, with maximum 3 tasks in parallel, in case some S3 partitions are late for any reason, it could quickly block all of the others tasks. Do you have any better way to achieve this ?
    a
    4 replies · 2 participants
  • c

    Chris Duke

    04/04/2022, 4:27 PM
    I've trying to register flows locally programmatically and getting an error on one flow:
    Traceback (most recent call last):
      File "/helix_orchestration/flows/reporting/fhir_rpt_reporting_pipeline.py", line 191, in <module>
        flow.register(project_name="helix", idempotency_key=flow.serialized_hash())
      File "/usr/local/lib/python3.7/site-packages/prefect/core/flow.py", line 1734, in register
        idempotency_key=idempotency_key,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1211, in register
        variables=dict(input=inputs),
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 554, in graphql
        retry_on_api_error=retry_on_api_error,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 458, in post
        retry_on_api_error=retry_on_api_error,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 738, in _request
        session=session, method=method, url=url, params=params, headers=headers
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 660, in _send_request
        raise ClientError(f"{exc}\n{graphql_msg}") from exc
    prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://apollo:4200/>
    
    The following error messages were provided by the GraphQL server:    GRAPHQL_VALIDATION_FAILED: Unknown type "register_tasks_input". Did you mean
            "register_agent_input", "delete_agent_input", "delete_flow_input",
            "delete_tenant_input", or "log_insert_input"?
        GRAPHQL_VALIDATION_FAILED: Cannot query field "register_tasks" on type
            "Mutation". Did you mean "register_agent", "delete_task", "insert_task", or "update_task"?
    I'm not sure what this error is indicating as i have other flows that register fine using this same register call and similar setup. I'm running this locally using docker image on version 0.15.10
    k
    a
    14 replies · 3 participants
  • l

    Lana Dann

    04/05/2022, 12:17 AM
    is there a way to customize slack notifications via prefect? for example, if i wanted to tag people on slack in the alert message based on which flow is running. can i pass in a custom message/alert?
    a
    2 replies · 2 participants
  • e

    Egil Bugge

    04/05/2022, 6:16 AM
    Hey all! Tested out Prefect Orion in the cloud yesterday and had the following scenario: • Made a simple hello world flow with a deployment spec
    @flow
    def hello_world(name="world"):
        print(f"Hello {name}!")
    
    DeploymentSpec(
        flow = hello_world,
        name="hello-world",
    )
    • Made a work queue and connected the work queue to this deployment • Span up an agent locally and connected it to this work queue • Clicked into the Deployments tab in the UI and selected "Quick run" which scheduled a run of my flow Then nothing happened, my local agent didn't pick up the flow. I then made a new work queue which I did not connect to the deployment and connected my agent to this new work queue instead. Now it immediately picked up the flow that I had started earlier and ran it. Is there something more I need to do to get an agent to pick up flows connected to a deployment?
    a
    2 replies · 2 participants
  • d

    Dennis Hinnenkamp

    04/05/2022, 7:41 AM
    Hi, I have set up a cloud hook that is supposed to notify me via MS Teams when a run is successful. Unfortunately, I don't get any notification and the test call doesn't seem to work either. I also use the same webhook URL in the Airbyte installation, the message also arrives in MS Teams, so that can't be the problem. Can anyone help here? By the way, I use Prefect Server. Thanks in advance
    ✅ 1
    a
    m
    +2
    15 replies · 5 participants
  • r

    Ray Tang

    04/05/2022, 8:08 AM
    Hi All, My team is designing a deployment pipeline (from Gitlab to our local Prefect Server). The reason is that our Developers are now able to use
    flow.register
    to deploy to production Prefect servers, which is not ideal. So I would like to ask if there are any online guilds on how to restrict the access of
    flow.register
    and any recommended deployment pipelines? Thanks
    a
    k
    5 replies · 3 participants
  • l

    Liam England

    04/05/2022, 2:02 PM
    Hey folks, Quick questions about best practices. Let's say my team is using a flow to pull data from a vendor API daily at 8am. Typically the data for that day will be available by 8am, but sometimes there's a delay on the vendor's side and our API call gets an empty response. The vendor data might not be available for another 30m - 1hr after the expected 8am pull. In this case, would it still be appropriate to add a
    retry_delay
    to the task and have it wait an hour, or would it be better practice to add some sort of handler to reschedule the flow for an hour later? Is the
    retry_delay
    basically a sleep in the backend, or does it handle scheduling the task?
    k
    2 replies · 2 participants
Powered by Linen
Title
l

Liam England

04/05/2022, 2:02 PM
Hey folks, Quick questions about best practices. Let's say my team is using a flow to pull data from a vendor API daily at 8am. Typically the data for that day will be available by 8am, but sometimes there's a delay on the vendor's side and our API call gets an empty response. The vendor data might not be available for another 30m - 1hr after the expected 8am pull. In this case, would it still be appropriate to add a
retry_delay
to the task and have it wait an hour, or would it be better practice to add some sort of handler to reschedule the flow for an hour later? Is the
retry_delay
basically a sleep in the backend, or does it handle scheduling the task?
k

Kevin Kho

04/05/2022, 2:04 PM
In my opinion,
retry_delay
is the easiest solution and people do this to poll for an event happening by giving a high number of
max_retries
. I think it’s harder to track the retry number if you use a subflow and reschedule. I wonder what would not an infinite loop, whereas the retries will be capped with a max
l

Liam England

04/05/2022, 2:08 PM
Good point, I'll stick with it for now. Thanks
View count: 1