https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Pedro Martins

    02/01/2021, 6:27 PM
    How would one trigger a prefect flow from a github event? Let's say merging to master/main should trigger a new run.
    m
    • 2
    • 3
  • m

    Matthew Blau

    02/01/2021, 7:40 PM
    Hello all, I am attempting to set up Slack Notifications for my flow and am following https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions. I have successfully installed the prefect app into my workspace and have been able to test the connection with success. However, I am unable to have it output information based on my actual flow. I receive the following error:
    Exception raised while calling state handlers: ClientError('400 Client Error: Bad Request for url: <http://host.docker.internal:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "secret_value" on type "Query".\n\nThe GraphQL query was:\n\n query($name: String!) {\n secret_value(name: $name)\n }\n\nThe passed variables were:\n\n {"name": "SLACK_WEBHOOK_URL"}\n')
    what am I doing wrong? I have @task(state_handlers=[slack_notifier]) set for the task I am testing out the notification for.
    k
    a
    • 3
    • 9
  • a

    Arun Giridharan

    02/01/2021, 8:44 PM
    Is there a way to write a prefect flow to snapshot a DB?
    k
    a
    • 3
    • 3
  • b

    BK Lau

    02/01/2021, 9:34 PM
    Is there anyone who have run an
    Apache Beam
    cluster using Prefect. If so, like to get some input and lessons from your experience.
  • v

    Verun Rahimtoola

    02/02/2021, 1:10 AM
    hi, we're using prefect server as our backend, and using a local agent on our infrastructure to execute flows. we're noticing that for some flows, only the first level of the dag (ie, the tasks with no upstream requirements) get executed, and then nothing happens... any clues as to what might be happening?
    k
    • 2
    • 7
  • g

    Giovanni Giacco

    02/02/2021, 1:35 AM
    Hello everyone, is there such a way to use Dask Temporary Cluster, created by Prefect, inside a task in order to execute any workload on that Dask cluster?
    k
    • 2
    • 2
  • j

    Jimmy Le

    02/02/2021, 1:52 AM
    Hey folks! I’ve been struggling a bit with understanding how Prefect connects with the AWS ecosystem. Namely ECR, ECS, and Fargate. I’d be willing to pay for a few hours of tutoring. Please DM me your rates :)
    a
    k
    k
    • 4
    • 5
  • j

    Josh Pitts

    02/02/2021, 1:56 AM
    hi! I have a Task that is querying a slow API (10K times). I want to checkpoint this task each iteration (disk is okay), but struggling with how to template the
    location
    since the task argument is what makes the task unique, but that’s not available to the context object as far as I can tell. Am I missing something obvious for this use case?
    c
    • 2
    • 3
  • e

    Eric

    02/02/2021, 3:52 AM
    hi Prefect, I noticed that the
    prefect/cli/server.py
    shows user can specify different ports to start the Prefect server services (i.e. graphql_port='xxxx', server_port='xxxx', etc.). But when I used the flow.register(project_name) in my script, the
    prefect/core/flow.py
    seems like using default 4200 port to register the flow and had no param to adjust the registering port in
    register()
    function, so it raised HTTPConnection error. Once I changed line 1666 - flow.py
    client = prefect.Client()
    with
    client = prefect.Client(api_server='<http://localhost>:MODIFIED_PORT/')
    and it works. Is this an issue here? Thank you 🙂
  • j

    Jitesh Khandelwal

    02/02/2021, 10:49 AM
    Hello everyone, I have a question. Is it possible to rerun only a subset of the mapped tasks of a flow ? Like only the ones which have failed.
    p
    • 2
    • 8
  • d

    Dmitry Klionsky

    02/02/2021, 11:45 AM
    Hello, I try to understand how applicable Prefect as a deployment tool. In a simplest form what I'm trying to achieve is this: 1. Read IP list of an unknown length (2, 3, 42, etc) 2. For each IP run a number of tasks sequentially (forbidden to run in parallel): 2.1 stop IP 2.2 wait new IP 2.3 init new IP Additional requirements: * Some task should be retryable (done) * Some task should be manually recoverable (done) Here's what I came up with:
    import random
    
    import prefect
    from prefect import task, Flow
    from prefect import Parameter
    from prefect.engine.signals import FAIL, LOOP, PAUSE
    
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def get_ips() -> list:
        if random.random() > 0.8:
            raise FAIL("Failed to get IPs")
        count = random.randint(2, 6)
        ips = [f"IP-{i}" for i in range(1, count)]
        print(ips)
        return ips
    
    @task
    def seq_reduce(func, init: any, lst: list) -> any:
        loop_payload = prefect.context.get("task_loop_result", {})
    
        index = loop_payload.get("index", 0)
        acc = loop_payload.get("acc", init)
    
        if index >= len(lst):
            return acc
    
        acc = func(lst[index], acc)
        raise LOOP(message=f"acc={acc} index={index}", result=dict(acc=acc, index=index+1))
    
    def seq_map(func, lst: list) -> list:
        def aux(x, acc: list) -> list:
            acc.append(func(x))
            return acc
        return seq_reduce(aux, [], lst)
    
    @task
    def stop_ip(ip: str) -> None:
        print("stop_ip: " + ip)
        pass
    
    @task
    def wait_for_ip(ip: str) -> str:
        print("wait_for_ip: " + ip)
        return ip + "-new"
    
    @task
    def init_ip(ip: str) -> None:
        print("init_ip: " + ip)
        if random.random() > 0.5:
            raise PAUSE("Initing IP failed. Waiting for human input")
        return "Done"
    
    with Flow("process_ip") as process_ip_flow:
        old_ip = Parameter("old_ip")
        stop_ip(old_ip)
        new_ip = wait_for_ip(old_ip)
        init_ip(new_ip)
    
    def process_ip(old_ip):
        state = process_ip_flow.run(old_ip=old_ip)
        #process_ip_flow.visualize(flow_state=state)
        return state.result[new_ip].result
        
    with Flow("process_ips") as process_ips_flow:
        old_ips = get_ips()
        new_ips = seq_map(process_ip, old_ips)
    
    if __name__ == "__main__":
        state = process_ips_flow.run()
        #process_ips_flow.visualize(flow_state=state)
        print(state.result[new_ips].result)
    Is the above what Prefect is designed for? Are there some other approaches?
    j
    • 2
    • 3
  • a

    Arnoldas Bankauskas

    02/02/2021, 12:54 PM
    Hi I want to try Prefect I decide setup VM with win 10 for that maybe is any system requirements for the prefect (Core, RAM, etc.)?
    j
    • 2
    • 1
  • s

    Samuel Hinton

    02/02/2021, 1:55 PM
    Hey all! One of the useful things we can do in airflow is to set a start date of a task far back in the past and it will run through all the “missed” executions until its up to date. Great for polling down a ton of historical data. Is there functionality within Prefect for this? Ive been trying to see if theres some way of configuring Clocks/schedules, etc, but on the face value Im not sure if this will have the wanted behaviour.
    j
    • 2
    • 2
  • m

    Matthew Blau

    02/02/2021, 5:00 PM
    Hello all, I am working on setting up Slack Notifications with Prefect and I am running into some issues the following code does not work but if I replace the request.post(my_secrect) call with the Slack webhook address directly it prints the is_failed() message as expected. What am I doing wrong with the configuration of the Secrets?
    def post_to_slack(task, old_state, new_state):
        my_secret = PrefectSecret("SLACK_WEBHOOK_URL").run()
        if new_state.is_retrying():
            msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)
    
            # replace URL with your Slack webhook URL
            <http://requests.post|requests.post>(my_secret, json={"text": msg})
        elif new_state.is_failed():
            msg = "Task {0} failed".format(task)
    
         # replace URL with your Slack webhook URL
            <http://requests.post|requests.post>(my_secret, json={"text": msg})
    
        return new_state
    I specifically receive this message if I leave the code as it is above:
    Exception raised while calling state handlers: ClientError('400 Client Error: Bad Request for url: <http://host.docker.internal:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n    GRAPHQL_VALIDATION_FAILED: Cannot query field "secret_value" on type "Query".\n\nThe GraphQL query was:\n\n    query($name: String!) {\n                secret_value(name: $name)\n    }\n\nThe passed variables were:\n\n    {"name": "SLACK_WEBHOOK_URL"}\n')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
        value = secrets[self.name]
    KeyError: 'SLACK_WEBHOOK_URL'
    Thank you in advance!
    m
    • 2
    • 29
  • v

    Verun Rahimtoola

    02/02/2021, 5:32 PM
    hi, is there a quick way to check the health and status of the lazarus process/service? perhaps a REST endpoint we could query or a command line utility we can invoke?
    j
    m
    m
    • 4
    • 27
  • j

    Jan Marais

    02/02/2021, 5:43 PM
    Hi All, I am trying to determine what AWS IAM policies are required for running flows as ECS tasks. If my flow storage is in S3 where would I assign the appropriate role so that the ECSAgent can read them? These are the places I can think of: • on the server running
    prefect agent ecs start
    with any of the aws cli ways • setting
    task_role_arn
    on either agent start or
    ECSRun
    • setting
    execution_role_arn
    on either agent start or
    ECSRun
    • In the task container Any insight on the differences of these would also be appreciated.
    j
    • 2
    • 6
  • a

    Adam Brusselback

    02/02/2021, 5:45 PM
    Hey all, just wondering if there are any good tips / links for Prefect project structure / how to save your tasks + flows for easy source control, etc
    j
    • 2
    • 3
  • m

    Mitchell Bregman

    02/02/2021, 8:46 PM
    Hi all, I am having issues with Prefect Cloud at the moment… getting an error trying to add a coworker to the team. It is saying that we have no additional space for users, when the account dashboard says that we do. Can someone message me offline to resolve?
    j
    • 2
    • 1
  • b

    BK Lau

    02/02/2021, 9:11 PM
    So is the
    Prefect Agent
    responsible for passing output of one task as input of another task in Prefect flow DAG since the Prefect server manages no code or input/output data. i'm trying to wrap my heads around what is the responsibility of the Agent. Maybe an architecture diagram might help.
    m
    • 2
    • 5
  • m

    Mohammed Khan

    02/02/2021, 9:16 PM
    Hello all, fairly new to Prefect here, trying to set up a flow-of-flows following the docs here: https://docs.prefect.io/core/idioms/flow-to-flow.html Ultimately I'd like to kick off only one flow using a schedule and chain a bunch of dependent flows to run, I tried the attached code but the runner is getting stuck on "Flow a" and not proceeding. Could someone take a look and tell me what I'm doing wrong?
    python a_flow.py
    python b_flow.py
    python parentflow.py
    [2021-02-02 15:19:53-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
    [2021-02-02 15:19:53-0500] INFO - prefect.TaskRunner | Task 'Flow a': Starting task run...
    b_flow.pyparentflow.pya_flow.py
  • b

    BK Lau

    02/02/2021, 10:34 PM
    Q: Can the same workflow be submitted concurrently?. i.e. I want to run 2 instances of the same workflow at same time from a Prefect server. So I assume that some UUID must be generated under the hood by Prefect to allow this? I know in a
    Argo Workflow
    , some random string is generated and appended to a workflow to disambiquate 2 workflows having the same name.
    j
    c
    • 3
    • 4
  • m

    Marwan Sarieddine

    02/02/2021, 11:25 PM
    Hi folks we came across a bug when chaining more than one decorator with the prefect task decorator
    👍 1
    c
    • 2
    • 7
  • i

    itay livni

    02/03/2021, 12:21 AM
    Hi - I'd like to confirm that this is indeed a bug, and not something on my end. Thank in advance https://github.com/PrefectHQ/prefect/issues/4054
  • v

    Verun Rahimtoola

    02/03/2021, 2:26 AM
    hi, i noticed that when a flow that is registered with the Prefect Server gets picked up by an agent to run it, the
    CloudTaskRunner
    seems to use a
    LocalResult
    to persist the results of every task... can someone point me to where this result object gets created?
    c
    • 2
    • 3
  • v

    Verun Rahimtoola

    02/03/2021, 2:46 AM
    another question: is it possible to have lazarus kick in sooner? right now we're seeing it kick in to resubmit flow runs at around the 15-17 minute mark, is there some config value we can set to bring this down a bit?
    c
    • 2
    • 2
  • v

    Vitaly Shulgin

    02/03/2021, 6:21 AM
    Hello team, we changed base for our worker container from
    prefecthq/prefect:all_extras-0.13.9
    to
    prefecthq/prefect:0.14.5-python3.8
    and now a lot of tasks a marked as failed because of
    State Message:
    No heartbeat detected from the remote task; marking the run as failed.
    j
    • 2
    • 1
  • j

    Jitesh Khandelwal

    02/03/2021, 8:00 AM
    Did anyone else come across a similar usecase ? It feels to me like a common problem.
  • l

    Laura Vaida

    02/03/2021, 12:33 PM
    hi folks! I'm currently looking for a solution to store a csv file generated with python to a snowflake stage. Does anybody have experence with this? thanks!
  • j

    Jan Marais

    02/03/2021, 1:21 PM
    Is it possible to use S3 storage if your flow depends on code in external modules? I keep running into this error:
    Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'project\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    Using ECSAgent and a custom image in ECR. I have no problem importing from the external module inside the container or in the environment where my agent is launched or where my flow is registered from.
    j
    • 2
    • 11
  • s

    Samuel Hinton

    02/03/2021, 1:50 PM
    Hi all! Im trying to get a prefect server deployed into a swarm, and cant use port 8080 for the UI. I notice in the docs that we can configure prefect (https://docs.prefect.io/core/concepts/configuration.html) but I cant find a list of the actual variables availables to customise. Is this online somewhere else?
    j
    • 2
    • 2
Powered by Linen
Title
s

Samuel Hinton

02/03/2021, 1:50 PM
Hi all! Im trying to get a prefect server deployed into a swarm, and cant use port 8080 for the UI. I notice in the docs that we can configure prefect (https://docs.prefect.io/core/concepts/configuration.html) but I cant find a list of the actual variables availables to customise. Is this online somewhere else?
j

josh

02/03/2021, 1:53 PM
Hi @Samuel Hinton you can find a list of all available config options here https://github.com/PrefectHQ/prefect/blob/master/src/prefect/config.toml
s

Samuel Hinton

02/03/2021, 1:55 PM
Beautiful, thanks Josh! Would be great to include a link to this if its not in the doco on that config page 🙂
View count: 1