https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Andreas Nord

    09/05/2022, 3:13 PM
    Hi! Can I set a task name dynamically during runtime (as was possible in Prefect 1)?
    👍 1
    ✅ 1
    j
    s
    a
    • 4
    • 4
  • r

    Rhys Mansal

    09/05/2022, 3:29 PM
    Hi everyone. Did something happen with prefectcloud automations on Sunday? All our slack messages stopped being sent without any alteration. The webhook is fine and when I send a test message via the UI this is also successful. Despite recreating all the automations they still do nothing when their triggers occur.
    ✅ 1
    :thank-you: 1
    a
    b
    • 3
    • 9
  • t

    Tim Helfensdörfer

    09/05/2022, 3:40 PM
    Hi, we have two work queues, Staging and Production. Exactly the same Deployment files, except the
    work_queue_name
    and
    tags
    . Both work queues are running at the moment, e.g. with
    prefect agent start  --work-queue "Production"
    . But only the Staging work queue picks up jobs. The production work queue does not. It is correctly assigned in the cloud ui, i.e. I can see the late jobs in the work queue tab, but the queue does not get any data: (moved to thread)
    ✅ 1
    a
    • 2
    • 26
  • e

    eddy davies

    09/05/2022, 3:56 PM
    Wondering if the try it now button on this page should link to the v2 rather than v1 docs
    ✅ 1
    a
    c
    • 3
    • 2
  • f

    F. Nikita Thomas

    09/05/2022, 4:54 PM
    Hi All! quick question, previously in prefect version 0.14.17 a flow could be visualized as long as the package
    prefect[viz]
    was installed, but with the current version ,<2.3.1> , I'm having difficulties get some basic code to work - Could someone please assist? Thanks!
    import requests
    from pprint import pprint as pp
    from prefect import flow, task, Flow
    import pandas as pd
    import json
    
    ## extract
    
    @task(name="extract")
    def get_data():
        r = requests.get("<https://rickandmortyapi.com/api/character/>")
        return r.json()
    
    @task
    def transform(r: dict):
        df = pd.json_normalize(r["results"])
        print(df)
    
    @task(name="load")
    def write_frame(df: pd.DataFrame):
        pass
    
    """
    ## This is the way to visualize flow before current version
    
    with Flow("ETL") as flow:
        e = get_data()
        t = transform(e)
        l = write_frame(t)
    flow.visualize()
    """
    
    @flow
    def flow_test():
        e = get_data()
        t = transform(e)
        l = write_frame(t)
        
    flow_test().visualize() # This doesn't work...
    ✅ 1
    a
    • 2
    • 2
  • a

    Arun Giridharan

    09/05/2022, 4:58 PM
    Hi all, I'm trying to schedule a flow but the scheduler isn't turned on when I view in the UI. When I try to turn the scheduler on using the slider, I get this error
    Something went wrong. Please wait a few moments and try again
    . Any idea how I can debug this?
    ✅ 1
    a
    • 2
    • 5
  • t

    Tom Kaszemacher

    09/05/2022, 5:20 PM
    Hello! I’m trying to setup dependent flows on ECS with Dask following the example at https://docs-v1.prefect.io/core/idioms/flow-to-flow.html I expected my children flows to each run on a dedicated dask worker but they run on the same machine as the parent flow. (code in thread) Anyone can point me to the proper approach?
    ✅ 1
    a
    • 2
    • 7
  • e

    Esdras Lopes Nani

    09/05/2022, 8:21 PM
    Hi all! Currently trying to use S3 as my deployment storage. Is it possible to add a subfolder in deploy? Example: -- bucket / -- flows / -- flow_a / -- "files uploaded by prefect" -- flow_b / -- "files uploaded by prefect"
    ✅ 1
    a
    • 2
    • 3
  • i

    Ian Andres Etnyre Mercader

    09/06/2022, 12:33 AM
    Hi everyone, While running my flow I got some of these msg in the agent logs
    orion_agent-orion_server-1  | 2022-09-06 00:21:29,528 - distributed.worker - WARNING - Compute Failed
    orion_agent-orion_server-1  | Key:       1843a132-66f7-45c1-9c76-9736ca31cd4a
    orion_agent-orion_server-1  | Function:  begin_task_run
    orion_agent-orion_server-1  | args:      ()
    orion_agent-orion_server-1  | kwargs:    {'task': <prefect.tasks.Task object at 0x7f28a2f18a60>, 'task_run': TaskRun(id=UUID('9ac621e9-cac9-4ff7-813e-da0ce5d2bda3'), created=DateTime(2022, 9, 6, 0, 15, 11, 280979, tzinfo=Timezone('+00:00')), updated=DateTime(2022, 9, 6, 0, 15, 12, 995201, tzinfo=Timezone('+00:00')), name='download_xml-39058d84-636', flow_run_id=UUID('258498db-6397-412e-a79a-39f252b8316a'), task_key='lib.dask_context.dask_context.<locals>._dask_task', dynamic_key='636', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=15, retry_delay=2), tags=['map_process'], state_id=UUID('79733d18-24d6-43b5-9f5f-1b04f3022edd'), task_inputs={'args': [TaskRunResult(input_type='task_run', id=UUID('fb87e646-0e3b-4647-abe5-8dea70156bbd'))], 'kwargs': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 9, 6, 0, 15, 11, 280780, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_
    orion_agent-orion_server-1  | Exception: "RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')"
    are they the attempts of the task that failed and are going to retry? or are they something else?
    k
    • 2
    • 1
  • f

    Faheem Khan

    09/06/2022, 1:09 AM
    prefect 2.3.1 with prefect-dask 0.2.0, I am getting the following error
    ✅ 1
    p
    a
    • 3
    • 10
  • e

    Emon Li

    09/06/2022, 2:25 AM
    Hey guys, has anyone ever used Prefect to manage AWS DMS tasks before?
    ✅ 1
    a
    d
    • 3
    • 4
  • c

    Clovis

    09/06/2022, 8:55 AM
    Hi everyone ! Since yesterday, all my Airbyte tasks are failing returning this kind of errors :
    ######## ERROR 1  
    <...>
      File "/root/.local/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 345, in run
        job_id, job_created_at = self._trigger_manual_sync_connection(
    TypeError: cannot unpack non-iterable NoneType object 
    
    ######## ERROR 2 
    <...>
      File "/root/.local/lib/python3.8/site-packages/prefect/tasks/airbyte/airbyte.py", line 333, in run
        job_status, job_created_at, job_updated_at = self._get_job_status(
    TypeError: cannot unpack non-iterable NoneType object
    The task failed even if the Airbyte sync is correctly launched. That’s odd because I did not make any modifications since yesterday on either 😛refect: Prefect or Airbyte. I first encountered this issue with prefect core v1.1.0, and I face the same behavior after upgrading my core version to v1.3.0. Have other people experienced this behavior ?
    ✅ 1
    a
    • 2
    • 3
  • b

    Barada Sahu

    09/06/2022, 8:59 AM
    Hey folks, What’s the expected behaviour when there’s a cleanup task waiting on a set of tasks and one of the upstream task fails? How does the overall state of the flow get resolved? Code below to clarify this.
    @flow
    def test_flow(nums: list = None):
        test_futures = [square.submit(num) for num in nums]
        return do_cleanup.submit(wait_for=test_futures)
    In the above does do_cleanup get invoked or does the flow directly go into a failed state?
    ✅ 1
    k
    • 2
    • 3
  • h

    Hamza Naanani

    09/06/2022, 11:53 AM
    Hello, When we build a deployment, the deployment gets the name of either the function that starts the flow, or the flow name argument in the function itself. But it doesn't take the deployment name itself that we specify with the
    --name
    argument, and there isn't an argument to specify the flow name in the CLI (unless we do it by hand after the creation of the file). I believe that's not a good behaviour since it complicates managing lot of flows. For instance let's assume we have this structure:
    - Flows
    --| - flow_1.py
    --| - flow_2.py
    If both files look like this
    @task
    def test_task():
       return 'hello'
    @flow
    def main():
       message = test_task()
       print(message)
    if __name__= '__main__':
      main()
    The flow name in the yaml file will be the same (main), and that will put confusion on the prefect UI. As we'll have the same flow but with different deployment names, but it's not the same flow code under the hood. Is there a better way to handle this apart from changing the name after the creation of the yaml file ?
    ✅ 1
    k
    • 2
    • 7
  • v

    Vlad Tudor

    09/06/2022, 12:22 PM
    Hi, does this look familiar?
    Failed to load and execute flow run: ModuleNotFoundError("No module named '/root/'")
    I am running the Prefect Agent on a separate machine than the Prefect Server, but the communication works (The Flow starts on the Agent) Thanks!
    ✅ 1
    a
    • 2
    • 8
  • m

    Matt Delacour

    09/06/2022, 1:07 PM
    Hi 👋 I cannot access the logs of any job since this morning ...
    👀 1
    ✅ 1
    d
    c
    • 3
    • 17
  • r

    Rajvir Jhawar

    09/06/2022, 1:20 PM
    Hi Prefect team, I can't seem to figure out how to override the customizations using the cli, I always end up with an error. Is it even possible to override the customizations using the cli?
    deployment build --override customizations = resource_object
    ✅ 1
    a
    c
    • 3
    • 6
  • t

    Timo

    09/06/2022, 1:39 PM
    Hi there, can I run a async task like
    prefect-dbt
    's
    trigger_dbt_cli_command()
    inside another task? This does not work for
    async
    tasks. Thanks
    ✅ 1
    n
    • 2
    • 6
  • k

    Kyle McChesney

    09/06/2022, 3:11 PM
    okay, here is an interesting question, I have the following example flow
    @task
    def required():
        return [
            1,
            2,
            3,
        ]
    
    
    @task
    def optional_case():
        return True
    
    
    @task
    def optional(data):
        data.append(4)
        return data
    
    
    @task
    def report_case():
        return False
    
    
    @task(skip_on_upstream_skip=False)
    def report():
        print('all done!')
    
    
    with Flow(
        'test',
        executor=LocalDaskExecutor(),
    ) as flow:
    
        data = required()
    
        with case(optional_case, True):
            opt_data = optional(data)
    
        with case(report_case, True):
            report(upstream_tasks=(data, opt_data,))
    The idea is there there are 2 variables, summarized as: • variable 1: should optional data processing be run • variable 2: should reporting be run at the end (note that reporting does not take data as an explicit input) I am trying to achieve this in prefect one using
    case
    statements. The issue is that I need to set
    skip_on_upstream_skip
    on report, so that it runs even if optional data processing is not run. I just want to ensure that report is run if its case is True, otherwise it is not run, but it must only be run AFTER
    data
    and
    optional
    have run
    1️⃣ 1
    ✅ 1
    a
    • 2
    • 4
  • a

    Andreas Nord

    09/06/2022, 3:12 PM
    Hi! I want to set the runtime name of a task depending on one of the arguments, without having to set name on every function call. Basically what is the Prefect 2 equivalent of the prefect 1 code below
    from prefect import task, Flow
    
    
    def foo(x):
        return _foo(x, task_args=dict(name=x))
    
    
    @task(task_run_name="{x}")
    def _foo(x):
        print(x)
    
    
    with Flow("flow") as flow:
        foo('a') # should be named 'a' 
        foo('b') # should be named 'b'
    
    flow()
    ✅ 1
    n
    • 2
    • 2
  • i

    Igor Morgunov

    09/06/2022, 4:57 PM
    Hi All, I have a question about setting upstream tasks. Here’s some pseudo code:
    @task()
    def task1():
    	# do stuff 
    
    @task()
    def task2():
    	# do other stuff 
    
    ids = ['aaa', 'bbb', 'ccc'] 
    
    for id in ids:
    	x = task1()
    
    y = task2(upstream_tasks=[x])  
    
    flow.run(executor=LocalDaskExecutor)
    My problem is that
    task2()
    fires when the first instance of
    task1()
    completes successfully - I need
    task2()
    to fire only once all of
    task1()
    instances have completed - what am I doing wrong here?
    ✅ 1
    b
    n
    • 3
    • 3
  • b

    Barada Sahu

    09/06/2022, 5:16 PM
    If there’s a task failure in an upstream task and I am waiting on it to complete (using
    wait_for
    ), it’s never triggered, rather goes into an undocumented
    NotReady
    state. To overcome this, I have to set
    .result(raise_on_failure=False)
    on the upstream task. See images below for states. See screengrabs 👇 - left fails, right works. This seems like a common DAG use-case which should be handled well by prefect, how do we wait on a set of upstream tasks to complete and then perform a cleanup / notification at the end (with an aggregate of results from all upstream tasks).
    ✅ 1
    m
    o
    • 3
    • 4
  • s

    Sam Garvis

    09/06/2022, 6:59 PM
    It would be nice if the schedule didn't wipe every time you re-apply a deployment
    ✅ 1
    a
    • 2
    • 8
  • k

    Krishnan Chandra

    09/06/2022, 7:00 PM
    Hey everyone! Trying to figure something out - in Prefect 1, is there a way to restart flow runs from the API? I am able to cancel flows via the API using the
    cancel_flow_run
    function but I don’t see a way to restart the run once cancelled
    ✅ 1
    a
    • 2
    • 10
  • n

    Nathaniel Russell

    09/06/2022, 8:11 PM
    Error downloading Flow from S3: An error occurred (AccessDenied) when calling the GetObject operation: Access Denied
    I keep getting this error when flows are supposed to download their definitions from storage. I have both a prefect user and prefect role specified in the bucket's policy.
    ✅ 1
    a
    • 2
    • 3
  • b

    Bradley Hurley

    09/06/2022, 9:07 PM
    I have a question about dynamically generating tasks. 🧵
    ✅ 1
    n
    • 2
    • 7
  • j

    John Kang

    09/06/2022, 9:09 PM
    Anyone else experiencing an error trying to pip install the latest prefect release (2.3.2)? https://github.com/PrefectHQ/prefect/blob/main/RELEASE-NOTES.md#232. Maybe not available yet?
    pip install "prefect==2.3.2"
    ERROR: Could not find a version that satisfies the requirement prefect==2.3.2 (from versions: 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.5.5, 0.6.0, 0.6.1, 0.6.2, 0.6.3, 0.6.4, 0.6.5, 0.6.6, 0.6.7, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.8.0, 0.8.1, 0.9.0, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.9.7, 0.9.8, 0.10.0, 0.10.1, 0.10.2, 0.10.3, 0.10.4, 0.10.5, 0.10.6, 0.10.7, 0.11.0, 0.11.1, 0.11.2, 0.11.3, 0.11.4, 0.11.5, 0.12.0, 0.12.1, 0.12.2, 0.12.3, 0.12.4, 0.12.5, 0.12.6, 0.13.0, 0.13.1, 0.13.2, 0.13.3, 0.13.4, 0.13.5, 0.13.6, 0.13.7, 0.13.8, 0.13.9, 0.13.10, 0.13.11, 0.13.12, 0.13.13, 0.13.14, 0.13.15, 0.13.16, 0.13.17, 0.13.18, 0.13.19, 0.14.0, 0.14.1, 0.14.2, 0.14.3, 0.14.4, 0.14.5, 0.14.6, 0.14.7, 0.14.8, 0.14.9, 0.14.10, 0.14.11, 0.14.12, 0.14.13, 0.14.14, 0.14.15, 0.14.16, 0.14.17, 0.14.18, 0.14.19, 0.14.20, 0.14.21, 0.14.22, 0.15.0, 0.15.1, 0.15.2, 0.15.3, 0.15.4, 0.15.5, 0.15.6, 0.15.7, 0.15.8, 0.15.9, 0.15.10, 0.15.11, 0.15.12, 0.15.13, 1.0rc1, 1.0.0, 1.1.0, 1.2.0, 1.2.1, 1.2.2, 1.2.3, 1.2.4, 1.3.0, 2.0a1, 2.0a2, 2.0a3, 2.0a4, 2.0a5, 2.0a6, 2.0a7, 2.0a8, 2.0a9, 2.0a10, 2.0a11, 2.0a12, 2.0a13, 2.0b1, 2.0b2, 2.0b3, 2.0b4, 2.0b5, 2.0b6, 2.0b7, 2.0b8, 2.0b9, 2.0b10, 2.0b11, 2.0b12, 2.0b13, 2.0b14, 2.0b15, 2.0b16, 2.0.0, 2.0.1, 2.0.2, 2.0.3, 2.0.4, 2.1.0, 2.1.1, 2.2.0, 2.3.0, 2.3.1)
    ERROR: No matching distribution found for prefect==2.3.2
    ✅ 1
    j
    a
    • 3
    • 5
  • d

    Darin Douglass

    09/06/2022, 9:38 PM
    it looks like
    fastapi
    has a way to override the default
    operationId
    which helps give endpoints better fn names. any chance orion could implement this? as they’re currently implemented, the operation ids (though descriptive) are pretty bad from a usability-perspective 😕
    ✅ 1
    • 1
    • 1
  • c

    Chris Goddard

    09/06/2022, 10:07 PM
    is anyone else having issues with app.prefect.cloud? no workspaces showing up - 422 errors in the console - also getting 422 errors from the api
    ✅ 1
  • a

    Alexey Stoletny

    09/06/2022, 10:09 PM
    Have issues and flows are now late
    ✅ 1
Powered by Linen
Title
a

Alexey Stoletny

09/06/2022, 10:09 PM
Have issues and flows are now late
✅ 1
View count: 1