https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • o

    Omar Sultan

    10/16/2022, 4:09 PM
    Hi Everyone, is there a way to run an agent to run queues as local process using a virtual env? The agent seems to be picking up the deafult python on the machine it is running on
    ✅ 1
    s
    a
    • 3
    • 7
  • c

    chicago-joe

    10/16/2022, 5:26 PM
    I'm getting a 404 not found error when trying to add a collaborator to the workspace. Anyone else experiencing this?
    ✅ 1
    👀 1
    a
    j
    • 3
    • 4
  • c

    chicago-joe

    10/16/2022, 10:27 PM
    S.O.S. what is happening
    File "/usr/local/lib/python3.9/site-packages/prefect/utilities/dispatch.py", line 186, in lookup_type
    raise KeyError(
    KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
    ✅ 1
    r
    a
    • 3
    • 8
  • s

    Santhosh Solomon (Fluffy)

    10/17/2022, 3:25 AM
    Can someone suggest how to run an agent in remote server?
    ✅ 1
    a
    • 2
    • 2
  • y

    YD

    10/17/2022, 4:14 AM
    Need tech advice… Strange Error on a flow that used to work… I get
    Finished task run for task with final state: 'TimedOut'
    after 3 second the task is running even though the task is set to
    @task(timeout=600, **params)
    and it was running fine till friday. it also runs fine when using
    flow.run()
    I am using https://cloud.prefect.io/ any suggestions? I deleted the flow an reregister it, but it did not help
    1️⃣ 1
    • 1
    • 1
  • d

    Darren Fleetwood

    10/17/2022, 4:58 AM
    Hey, is Prefect Cloud 2.0 down? I've logged into the UI and can't see anything. The API/client still seems to be working though.
    ✅ 1
    a
    j
    • 3
    • 3
  • m

    Mark

    10/17/2022, 7:54 AM
    Can I run Prefect without the UI? I am looking at running some local testing using serverless fargate, but currently we would not be allowed to connect to the Prefect Cloud Service now are we allowed to run servers. Unless I can get away with running the UI inside fargate which may be an option?
    ✅ 1
    r
    • 2
    • 1
  • s

    SULEMAN KHAN

    10/17/2022, 8:43 AM
    Hello, Prefect is not validating the
    cache
    when running it with celery using this command.
    celery -A app.tasks.celery_app worker --max-tasks-per-child=1 --loglevel=ERROR
    Everything works fine when I use this command
    celery -A app.tasks.celery_app worker --loglevel=ERROR
  • j

    Jared Robbins

    10/17/2022, 1:00 PM
    Thanks for the ability to not persist results! Now I can let prefect run without wiping out its storage every day :)
    👍 2
    🎉 3
    :thank-you: 3
    ✅ 1
    :marvin: 3
  • v

    vk

    10/17/2022, 1:02 PM
    Hi all, Is it possible to use module name instead of full file name in orion deployments when using “native” storage? In prefect1 that was possible to specify just module nam, e.g.
    flow.storage = Module('samples.flow_orion')
    Now i’m kind of forced to specify exact full filename, even though I have file in python path. That’s very inconvenient for cases when developers have no control over the final image layout. That’s how I create deployments now:
    entrypoint = 'samples.flow_orion:entrypoint' # this is just module, available on path
    flow = prefect.utilities.importtools.import_object(entrypoint)
    
    # here is path/entrypoint i need to specify path. could I use module name instead?
    deployment = Deployment.build_from_flow(
        name=flow.name,
        flow=flow,
        work_queue_name="kubernetes",
        skip_upload=True,
        infrastructure=KubernetesJob(
            image=image,
            namespace='prefect2',
            job=k8s_template_orion(flow)
        ),
        path='/app/lib/python3.9/site-packages/samples/', # TODO 
        entrypoint='flow_orion.py:entrypoint'
    )
    deployment.apply()
    Did i miss how to use module name instead of path?
    ✅ 1
    a
    • 2
    • 3
  • n

    Nic

    10/17/2022, 1:42 PM
    When using String blocks - the value that is extracted from the block is String(value='thisisthevalue') - is there a built in way to only get the value of the actual string - thisisthevalue ?
    ✅ 1
    • 1
    • 1
  • m

    Mark Li

    10/17/2022, 1:43 PM
    Hello Everyone, Currently running Prefect 2.0 on AKS. We’ve been working on deploying Flows via Azure DevOps and have successfully deployed them to a default workqueue. However, when testing a flow run, the agent (which is started for the default workqueue) doesn’t pick up the flow run. Does anyone have any ideas on which the issue might be here?
    ✅ 1
    r
    c
    • 3
    • 28
  • j

    Justin Trautmann

    10/17/2022, 2:35 PM
    Hello community, hello Prefect team, one of our prefect 2 flows contains a large number of parallel tasks that access a database with limited capacity. in order to avoid too much load on the db, i tried to apply tag based task concurrency limits, observed some unexpected behavior and would love to get some feedback on if i am using concurrency limits correctly. 1. when submitting a batch of tasks in a loop, the specified concurrency limits do not seem to apply immediately. there are more running tasks than active slots for the tag. only if i introduce an artificial delay between submit calls, the limit takes effect. i assume this is due to some race conditions when accessing the orion db and was wondering if there is a preferred way of submitting tasks so that limits are respected. 2. when increasing the task concurrency limit while a flow is executing (this would be very useful when having long running flows and a dynamically scaleable db), the number of "Active Tasks Runs" doesn't return to zero after all flows are completed. instead a certain number of tasks is still listed as active by the
    prefect concurrency-limit inspect
    command but is in fact in status completed. this effectively lowers the concurrency-limit for all subsequent tasks as these pseudo-active tasks keep occupying a slot for the tag. are we at all supposed to adapt the concurrency limit while tagged tasks are being executed or is this considered bad practice? Please find in the thread a self-contained example flow that can be used to reproduce the behavior. Any advice i much appreciated. Thank you.
    👀 1
    🚀 2
    ✅ 1
    j
    m
    • 3
    • 29
  • r

    redsquare

    10/17/2022, 2:57 PM
    Hi - I am trying to run a Prefect 2.6.1 KubernetesJob however the job when created keeps failing with - any ideas? I have tried changing path & entrypoint settings numerous times but am now stuck
    /usr/local/bin/python: No module named prefect.engine.__main__; 'prefect.engine' is a package and cannot be directly executed
    j
    • 2
    • 1
  • e

    Evan Curtin

    10/17/2022, 3:17 PM
    Just saw the release notes :gratitude-thank-you: 🎉 to the addition of results!
    🙌 7
    :thank-you: 2
    👍 2
    ✅ 2
  • k

    Kelvin DeCosta

    10/17/2022, 5:03 PM
    Hey everyone! I'm so close to deploying my first flow, with
    ECSTask
    infrastructure. I'm programmatically creating a
    Deployment
    for a simple flow and it seems to have registered on our Prefect Cloud. However, the deployment run fails without log messages in Prefect Cloud. The agent logs
    botocore.exceptions.ParamValidationError
    , which makes me think this might be out of my reach. I'm currently in what seems to be the exact same situation as @Anthony Desmier (link to thread) is in. Not really sure where to go next. Any help is appreciated a lot, thank you for your time!
    ✅ 1
    c
    a
    +2
    • 5
    • 19
  • b

    Ben Ayers-Glassey

    10/17/2022, 5:15 PM
    Seems like prefect's
    prefect deployment apply
    command has a typo in its output:
    Successfully loaded 'sampleflow_dev'
    Successfully uploaded 4 files to <gcs://dev-prefect-work/flow_storage/>
    Deployment 'sampleflow/sampleflow_dev' successfully created with id 'c3b605e4-ca96-45eb-b51a-b048ca6d2e08'.
    View Deployment in UI: <https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1b8a7-ed9c-46d2-a88a-55eef95b8ef7/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08>
    
    To execute flow runs from this deployment, start an agent that pulls work from the 'kubernetes' work queue:
    $ prefect agent start -q 'kubernetes'
    ...that link it shows is incorrect: https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1[…]55eef95b8ef7/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08 ...it should be: https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1[…]/deployments/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08 ...in other words, it's missing a "/deployments"
    👀 1
    ✅ 1
    k
    c
    • 3
    • 6
  • k

    Kevin Wang

    10/17/2022, 5:32 PM
    How should we think about job resiliency during agent stoppage and restart? When an agent stops or a flow run executor stops, how can we ensure the flow run is properly rerun, and NOT stuck in the Running state (with no execution)? I'm using local execution block, but this Github issue suggests remote runs on Kubernetes (and maybe ECS) are also a concern, for 'missing' jobs that don't get reruns. https://prefect-community.slack.com/archives/C03D12VV4NN/p1664554177863929?thread_ts=1664525816.785439&amp;cid=C03D12VV4NN
    c
    • 2
    • 2
  • s

    Sean Davis

    10/17/2022, 5:56 PM
    When running using async, there are a number of pythonic ways to approach concurrency limits including queues and semaphores. The examples I have seen with Prefect 2 (and I may not be looking hard enough) are associated with process concurrency and not async concurrency. Is there a recommended way of limiting async concurrency? Are we free to use
    asyncio.Semaphore
    or
    asyncio.Queue
    and the like? Thanks for any suggestions or pointers.
    ✅ 1
    c
    m
    • 3
    • 5
  • x

    Xavier Lesperance

    10/17/2022, 6:34 PM
    Good afternoon, I managed to fix my problem
    🙌 5
    🎉 3
    ✅ 2
  • j

    Jarvis Stubblefield

    10/17/2022, 8:52 PM
    I’m trying to contribute to Prefect… I downloaded the repository and ran the tests (without modifications). This is the summary of the output I received…
    10 failed, 4486 passed, 39 skipped, 3 xfailed, 11 errors in 1173.85s (0:19:33)
    I wasn’t expecting to have failures… I’ve added my code and am now running the 20 minutes of tests to ensure it’s working as expected… I’m assuming so long as I don’t get to
    11 failed
    in the summary I should be okay right?
    ✅ 1
    m
    • 2
    • 33
  • c

    Cameron Chafetz

    10/17/2022, 10:21 PM
    Hey all, I'm in the process of evaluating whether prefect 2.0 is a viable solution to replace an internal tool at work. I've gone through setting up an ECS cluster and have prefect agents listening and able to spin up a dask scheduler and dask cuda worker. However, I'm running into some prefect issues, referenced by someone else here: https://github.com/PrefectHQ/prefect/issues/6759 Does anyone have a workaround for this issue or an ETA on when this will be fixed? I've already tried setting an environment variable in the docker image (PREFECT_API_REQUEST_TIMEOUT=60), but to no avail. Exact error I'm seeing in the dask-cuda-worker in the 🧵
    m
    • 2
    • 13
  • k

    kent

    10/18/2022, 1:11 AM
    Hi The Prefect schedule does not seem to work. Here is the code and command I ran. flows/tutorial.py
    from prefect import flow,get_run_logger
    
    @flow
    def my_favorite_function():
    
        logger = get_run_logger()
        <http://logger.info|logger.info>("aaaaaaaaaaaaaaaaaaaaaaaaaa")
        print("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
        return 42
    prefect deployment build flows/tutorial.py:my_favorite_function -n etl --cron "*/1 * * * *" -q etl --apply
    prefect agent start -q 'etl
    Is there a problem?
    ✅ 1
    a
    j
    • 3
    • 8
  • j

    John Ramey

    10/18/2022, 2:03 AM
    Hey, I’m in the process of switching to Prefect 2.x. I’m trying to figure out how to map over multiple arguments and generate child subflows. I realize that
    Task.map
    can do something similar to generate tasks, but I’d prefer subflows for my project. Is this possible? If so, any docs or resources y’all can point me to? Thanks!
    j
    • 2
    • 9
  • s

    Shane Hu

    10/18/2022, 2:13 AM
    "Can prefect execute a flow in a single thread? See comments for more details and context, I’m trying to get Prefect to run Playwright, but it is single-threaded only." Hi Can Playwright be combined with Prefect?
    [tool.poetry.dependencies]
    python = "^3.10"
    prefect = "^2.6.1"
    playwright = "^1.27.1"
    import prefect
    from playwright.sync_api import Page, sync_playwright
    
    @prefect.task
    def go_to_a_web(page: Page):
        page.goto("<https://formulae.brew.sh/cask/slack>")
    
    
    @prefect.flow
    def main_flow():
        with sync_playwright() as pw:
            browser = pw.chromium.launch(headless=False)
            page = browser.new_page()
            go_to_a_web(page)
    
    
    main_flow()
    I got
    greenlet.error: cannot switch to a different thread
    any help would be much appreciated
    ✅ 1
    j
    m
    • 3
    • 8
  • s

    Shrikkanth

    10/18/2022, 5:27 AM
    Hi all, Im trying to post a message in slack after the flow gets to success state. But it shows None as the message for some reason. I have used the same function for failure and it works fine on it. I have given the flow code below First flow function try:
    def post_to_slack_on_success(task, old_state, new_state):
        if new_state.is_successful():
            flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
            task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
            msg = task_run.get_result()
        SlackTask(message=msg).run()
        flow.state_handlers.append(slack)
    Second flow function try:
    def post_to_slack_on_success(task, old_state, new_state):
        if new_state.is_successful():
            print(new_state)
            print(old_state)
            print("Result  :  ", new_state.result, new_state.message)
    Screenshot of the alert:
    ✅ 1
    m
    • 2
    • 2
  • s

    Shruti Hande

    10/18/2022, 5:58 AM
    Hello, Trying to run multiple python scripts in prefect. My use case is that, each task should represent as an individual python script and these all scripts/ tasks can be combined to form a single flow . HOW IS THIS ACHIEVABLE? Can we run bash commands as a single task in prefect version 2. #prefect-contributors #prefect-community #prefect-dbt
    🙌 2
    ✅ 1
    s
    j
    k
    • 4
    • 6
  • a

    Andreas Nigg

    10/18/2022, 6:04 AM
    Hey, I've a question about Results storage. (prefect 2.6) Currently, I do not configure any results storage - neither on flows nor tasks. According to the docs, the results then are stored in the PREFECT_LOCAL_STORAGE_PATH (~/.prefect/storage). This is also what I see - a lot of blobs in this folder. Does prefect sometimes delete these blobs/results? Or am I responsible for cleaning up the result storage location?
    ✅ 1
    j
    • 2
    • 1
  • a

    Angelika Tarnawa

    10/18/2022, 8:30 AM
    Hi, I'm trying to run a flow on prefect 1.0. Every task ends up with a success but at the end flow fails. When I check the logs there is an error
    prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
    I have tried to re-register flow and I'm sure that flow run and registered flow are the same
    👀 1
    b
    • 2
    • 2
  • s

    Slackbot

    10/18/2022, 10:21 AM
    This message was deleted.
Powered by Linen
Title
s

Slackbot

10/18/2022, 10:21 AM
This message was deleted.
View count: 1