https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michael Warnock

    07/17/2021, 4:12 PM
    Dependency best practices question: I have an existing repo
    feature-generator
    which contains both worker/orchestration logic and the code for doing the work. I added task and flow definitions to it, but with github storage, the flow can't find the other modules in that repo (I've seen https://github.com/PrefectHQ/prefect/discussions/4776 and understand this is intentional). My question is how best to structure things so that my flow can use that repo's code, but also execute a parameterized run from
    feature-generator
    , on commit, through CI (because that's how we start the job right now). Obviously, I can make
    feature-generator
    a package and depend on it from a new
    flows
    repo, but to have
    feature-generator
    start the run would create a circular dependency. Would you split it into three repos, with one of them just being responsible for executing the flow? I don't love that idea, but maybe that's best practices?
    k
    • 2
    • 24
  • m

    Michael Warnock

    07/17/2021, 4:21 PM
    Scaling question: forgive me if I missed this in the docs/github-issues (I have looked), but do flows executed with ECSRun auto-scale in some way? Ie, do mapped tasks turn into ECS tasks? Or does some ECS-service/task setting do the job based on resource consumption? Or do I need a Dask cluster for that? Does a Dask cluster even do this directly?
    k
    • 2
    • 3
  • h

    haven

    07/18/2021, 6:00 AM
    KubernetesSecret question: currently it seems that it's designed such that
    KubernetesSecret
    should not rely on any upstream task/parameters. However, I think it might not be a great assumption. I'm using an
    env
    parameter that denotes
    "DEV"
    or
    "PROD"
    and then would decide what secret I want to retrieve from k8s later (usually a database secret). Any plan to allow
    KubernetesSecret
    to be a dynamic task and interpreted naturally in the flow without having to manually use
    flow.set_dependencies
    ?
    ✅ 1
    d
    • 2
    • 11
  • a

    Austen Bouza

    07/18/2021, 5:59 PM
    Is there a way to use a
    DictCursor
    with the existing
    SnowflakeQuery
    task? The source shows
    try:
                with conn:
                    with conn.cursor() as cursor:
                        executed = cursor.execute(query, params=data).fetchall()
                conn.close()
                return executed
    while what I would want to use is:
    try:
                with conn:
                    with conn.cursor(DictCursor) as cursor:
                        executed = cursor.execute(query, params=data).fetchall()
                conn.close()
                return executed
    Has anyone else dealt with this before? I’d like to simply inject the DictCursor class if possible, but from the way this block of code is written it looks like the only way to do it is to subclass
    SnowflakeQuery
    and overwrite the entire
    run
    method.
    ✅ 1
    k
    • 2
    • 2
  • s

    Seth Coussens

    07/18/2021, 11:03 PM
    @Kevin Kho When mounting a volume using the prefect docker agent, it all seems to work, but the docker container flows don't see anything in the mounted volume:
    prefect agent docker start --token <token>--name "EMG-DCK-01" --volume company:/mnt/company --label production --log-level DEBUG --show-flow-logs
    Any flow using this agent then shows that directory as empty. When I manually run the container (on the same account, terminal windows, etc as the agent) and mount it with -v using the same syntax it tests out fine and the volume has the expected files:
    docker run -it -v company:/mnt/company node:lts bash
    
    cd /mnt/company
    ls
    
    **shows all files**
    Any ideas why the volume when mounted with the agent configuration would not be working? This is what the company volume configuration looks like:
    PS C:\Users\azure.automation> docker volume inspect company
    [
        {
            "CreatedAt": "2021-07-17T22:43:31Z",
            "Driver": "local",
            "Labels": {},
            "Mountpoint": "/var/lib/docker/volumes/company/_data",
            "Name": "company",
            "Options": {
                "device": "//192.168.1.2/Company",
                "o": "user=myusername,domain=mydomain,password=mypassword",
                "type": "cifs"
            },
            "Scope": "local"
        }
    ]
    k
    m
    • 3
    • 18
  • z

    Zach Schumacher

    07/19/2021, 2:53 PM
    is there a context date equivalent to airflows execution date? e.g. if a job fails, i want to rerun it with the date that job failed on. I could be doing something wrong, but it looks like when i reran it still used today’s date
    k
    • 2
    • 2
  • k

    Kevin Kho

    07/19/2021, 3:26 PM
    Hey everyone, in case you missed it, I will be speaking at Dremio Subsurface this week (July 21 1:45 pm ET).
    🚀 6
  • f

    Fabrice Toussaint

    07/19/2021, 4:24 PM
    Hey all, Can someone explain to me why I am getting this error:
    Traceback (most recent call last):
      File "runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "\executors.py", line 537, in prepare_upstream_states_for_
    mapping
        value = upstream_state.result[i]
    KeyError: 0
    [2021-07-19 18:23:21+0200] ERROR - prefect.ETL | Unexpected error occured in FlowRunner: KeyError(0)
    k
    • 2
    • 3
  • h

    Hugo Shi

    07/19/2021, 6:19 PM
    Is there a way to easily get the result location for a task so that we can re-run failed tasks? I can see the result location when I mouse over the field in the prefect cloud UI, but I can't figure out how to cut and paste it?
    k
    • 2
    • 24
  • l

    Leon Kozlowski

    07/19/2021, 7:15 PM
    Hi all, is there any concept of stopping a flow after a certain task to examine outputs + proceed with user selected results as parameters for a downstream task? Example that is coming to mind is result of say a grid search
    k
    • 2
    • 7
  • m

    Matthew Alhonte

    07/19/2021, 8:43 PM
    Thinking of using Coiled for our Prefect stuff - how does it work to have different Docker images for different flows with Coiled? Can you just set the Execution environment as the Dask cluster and the Run_Config as DockerRun with the image file?
    k
    • 2
    • 13
  • h

    Harry Baker

    07/19/2021, 8:47 PM
    is there way to pass in a parameter for retry_delay in a task? i have a task that imports google sheets, and want to set up some kind of exponential backoff to my attempts whenever i run into the API limit.
    k
    • 2
    • 5
  • e

    Erik Amundson

    07/19/2021, 10:17 PM
    Has anyone run into this issue before? Running
    DaskExecutor
    on GKE with cluster class
    dask_kubernetes.KubeCluster
    and it seems to be dropping 1-2 mapped children per run. It's like the scheduler doesn't realize they exist, or is losing track somehow - there is no error message in the logs. This prevents the flow from proceeding to the downstream tasks, so I end up having to cancel the flow. So far it's shown the same behavior on all four test runs. If it matters, we're running on prefect 0.14.16.
    k
    • 2
    • 12
  • h

    Hugo Kitano

    07/19/2021, 10:21 PM
    I’m using the ECSRun as my run_config for my ECS flows, and am switching over from making my agent a lone ec2 instance to an ecs service to ensure it stays alive. Now, when I try to start a new flow run, I’m running into a credentials error. Any idea of where the error might be coming from? I changed nothing in my flow.
    logs.txt
    k
    m
    • 3
    • 4
  • n

    Nishtha Varshney

    07/20/2021, 8:06 AM
    Hello, had this basic doubt why should we use the prefect cloud beyond the prefect library which is free and open source?
    j
    • 2
    • 2
  • b

    Bruno Murino

    07/20/2021, 11:02 AM
    Hi everyone — is there a way to use a task output on the state handler ?
    j
    • 2
    • 1
  • a

    An Hoang

    07/20/2021, 1:26 PM
    Maybe I'm missing something obvious. Can't seem to persist my task/flow result. The folder
    ./test_prefect
    is created but nothing is outputted. Here's my very simple code:
    ## test_prefect.py
    result = LocalResult(dir="./test_prefect")
    
    @task(target="func_task_target.txt", checkpoint=True, result = result)
    def test(val):
        return val
    
    with Flow("simple_test",result=result) as test_flow:
        result = test(99)
        
    
    test_flow.run()
    I did set
    PREFECT__FLOWS__CHECKPOINTING=true
    before doing
    python test_prefect.py
    k
    • 2
    • 11
  • m

    Michael Warnock

    07/20/2021, 2:38 PM
    Error grabbing logs: unexpected EOF
    <- Has anyone seen this in their (docker) agent output. It happened after two frames of stacktrace for me, on a mapped task (149 of 876), and the agent stopped processing anything (or sending anything to stdout anyway, and the rest of the tasks didn't run). Grepping for "grabbing logs" turns up nothing in the prefect repo, our own code, or any of the dependencies in my virtualenv. It's a cloud-backed flow run; is this coming from cloud, maybe?
    k
    • 2
    • 3
  • a

    An Hoang

    07/20/2021, 2:58 PM
    Any usage documentation/tutorial of "Artifacts"? Saw it on cloud but I could only find mentioning of it in the API docs
    j
    • 2
    • 1
  • l

    Laura Vaida

    07/20/2021, 3:08 PM
    hi there! does anybody have experience with how to implement a pyhton/prefect run into a dbt workflow, so that some of the tables are running before and some after that python job?
    ✅ 1
    k
    • 2
    • 7
  • s

    Sean Talia

    07/20/2021, 3:38 PM
    Hi all – are there any known issues with Cron clocks? I'm having some difficulty with porting a job out of our old scheduling tool that my team uses to a new one. We have a job that runs every day, every 8 hours starting at 7am. In moving this job to prefect and setting the flow schedule, I'm using a cron schedule that seems to me like it should achieve what I want, and the Prefect UI tells my in plain english that the flow's schedule is correct (within the flow's Overview pane), but then when I look at the scheduled upcoming flows, it doesn't seem to match.
    j
    k
    m
    • 4
    • 23
  • d

    Daniel Burkhardt

    07/20/2021, 3:45 PM
    Hi, I'm interested in better understanding how to use persistent caching to help debug failed flows comprising mapped tasks. Because it doesn't seem there's anyway to cache input to disk, I want to know what's the preferred way to identify the input to a failed task. I'm currently using
    {map_index}
    in the Result location, and it seems like the output of
    first_map_fn[0]
    always get' passed to
    second_map_fn[0]
    even if
    first_map_fn[1]
    finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run? Please find example flow in thread.
    k
    • 2
    • 15
  • j

    Jared Noynaert

    07/20/2021, 3:57 PM
    Is there a good way to have k8s runs inherit env vars from the agent?
    k
    • 2
    • 6
  • c

    Clemens

    07/20/2021, 4:26 PM
    Hi, I am debugging a mapped flow where I have two independent mapped pipelines and want the second to start only once the first is finished. It seems that a (mapped) task that depends on an upstream (mapped) task does not get started if an empty array is passed into the upstream mapped task. Is this wanted behaviour? I guess it can be debated if a task should be considered “finished” if it is never triggered, but on the other hand I expect the downstream task with a
    run_always
    trigger to run, well, always. Example code in thread
    k
    • 2
    • 14
  • m

    Mehdi Nazari

    07/20/2021, 4:41 PM
    Hello All, I’m working on the Hybrid model flow execution with a Docker Agent as flow run executer in individual docker container. Currently I have the Agent up and running on our local docker platform and successfully connected that with the Cloud UI with proper Service Account Key.
    prefect agent docker start --key "SERVICE_ACCOUNT_KEY" --name "AGENT_NAME"
    I was also able to write a simple Flow and register as a docker docker storage. Cloud UI also recognizes the Flow as a registered flow; but upon executing it, I’m getting below error on the Agent:
    docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.40/containers/create?name=nickel-okapi>: Bad Request ("invalid IP address in add-host: "host-gateway"")
    What am I missing? any troubleshooting I can do?
    k
    • 2
    • 11
  • v

    Vincent

    07/20/2021, 4:51 PM
    I noticed that when using
    wait=True
    for
    StartFlowRun
    tasks, the result objects are not stored at the target store. I want to kick off a flow of flows, such that the
    StartFlowRun
    tasks saves a result object to avoid resubmission of the flow.
    wait=False
    restores the expected behavior. Are there any suggestions to have dependent flows, with checkpointing.
    k
    • 2
    • 3
  • s

    Sean Harkins

    07/20/2021, 5:14 PM
    We have a requirement to create a Prefect
    Cloud Hook
    for the Flow
    success
    event. This needs to be done programmatically via the api at registration time. The documentation states
    Psst! We have recently added Automations which offer more functionality than Cloud Hooks and will eventually replace them.
    There does not seem to currently be programmatic api access to Automations. 1. Is there a way to register an Automation via an API and if so is it documented somewhere? 2. When is the planned deprecation of Cloud Hooks? We will need to support a mix of self hosted Prefect Server instances and Prefect Cloud usage so we would like a solution which supports both but it appears that Automations will not be available via Prefect Server.
    j
    • 2
    • 29
  • h

    Hugo Kitano

    07/20/2021, 7:20 PM
    do cloud agents need unique names?
    j
    k
    • 3
    • 5
  • d

    Dave Nielsen

    07/20/2021, 7:42 PM
    Reminder: @Kevin Kho's talk at Subsurface is tomorrow at 10:45 am Pacific. "Orchestrating Data Validation Workflows with Prefect". It's free.
    🚀 2
    :marvin: 1
  • t

    Thomas Nakamoto

    07/20/2021, 8:20 PM
    Hello community! I am trying to integrate slack notifications into my workflow. Is there a good way to post to Slack if all tasks are successful? Here is my state_handler:
    def post_to_slack(task, old_state, new_state):
        # <https://github.com/PrefectHQ/prefect/blob/d61fa6aac9330c5817cc8e8b8f8cca2d634ea7e1/src/prefect/engine/state.py>
        if new_state.is_retrying():
            msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
            <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
    
        if new_state.is_failed():
            msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
            <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
    
        # if new_state.is_successful():
        #     msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
        #     <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
    
        return new_state
    I would like to avoid sending a success message for each task, and instead send one only if every task ran successfully. Thanks!
    b
    k
    • 3
    • 6
Powered by Linen
Title
t

Thomas Nakamoto

07/20/2021, 8:20 PM
Hello community! I am trying to integrate slack notifications into my workflow. Is there a good way to post to Slack if all tasks are successful? Here is my state_handler:
def post_to_slack(task, old_state, new_state):
    # <https://github.com/PrefectHQ/prefect/blob/d61fa6aac9330c5817cc8e8b8f8cca2d634ea7e1/src/prefect/engine/state.py>
    if new_state.is_retrying():
        msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
        <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})

    if new_state.is_failed():
        msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
        <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})

    # if new_state.is_successful():
    #     msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
    #     <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})

    return new_state
I would like to avoid sending a success message for each task, and instead send one only if every task ran successfully. Thanks!
b

Ben Muller

07/20/2021, 8:29 PM
If you use cloud, there is an inbuilt "automations" in the UI so you don't have to write any code. It's awesome ☺️
t

Thomas Nakamoto

07/20/2021, 8:31 PM
I'm just using core at the moment, cloud would be great.
k

Kevin Kho

07/20/2021, 8:31 PM
Hey @Thomas Nakamoto, I assume you have tasks that can Fail without the flow Failing, so you want a notification only if everything succeeds? Or do you just want a way to know if the Flow succeeds?
t

Thomas Nakamoto

07/20/2021, 8:34 PM
Hi @Kevin Kho just a way to know the Flow succeeded. I have a workflow that will run hourly and I would like a confirmation that it ran.
k

Kevin Kho

07/20/2021, 8:37 PM
Oh. You should just use the SlackTask or Slack Notifier at the end on your Flow. The SlackTask is more configurable as I think the Slack Notifier requires you store the Secret on Prefect Cloud
t

Thomas Nakamoto

07/20/2021, 8:41 PM
@Kevin Kho thank you I will try out the SlackTask
View count: 1