https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Chris Gunderson

    01/10/2023, 2:11 PM
    Is there a way to put the deployment files in a different folder? Right now it is in the root folder of my project. I'd like to separate this by environment. Am I over thinking this? Should I just delete these files?
    m
    d
    11 replies · 3 participants
  • c

    Christopher Boyd

    01/10/2023, 2:38 PM
    Hi Torstein, can you attempt the same but capture a .har file in developer tools to see the actual response being submitted / received from the UI?
    t
    7 replies · 2 participants
  • j

    James Sopkin

    01/10/2023, 3:25 PM
    Hi @Nikhil Jain , the difference between cancelling and cancelled is that the first is in the process of cancelling whereas the other is cancelled
    n
    4 replies · 2 participants
  • l

    lialzm

    01/10/2023, 3:45 PM
    Hi team, can help me ? I get a error like
    Downloading flow code from storage at None
    
    Flow could not be retrieved from deployment.
    my code
    ali= RemoteFileSystem.load("ali")
    
    
    deployment = Deployment.build_from_flow(
        flow=download,
        name="test",
        storage=ali,
        schedule=(CronSchedule(cron="0 16 * * *"))
    )
    deployment.apply()
    c
    4 replies · 2 participants
  • j

    James Phoenix

    01/10/2023, 3:46 PM
    I’m trying to get my Prefect agent running on GKE (autopilot)
  • j

    James Phoenix

    01/10/2023, 3:47 PM
    I’m trying to get my Prefect agent running on GKE (autopilot) Currently I’m running into this error. I’ve run this command
    prefect kubernetes manifest orion | kubectl apply -f -
    0/2 nodes are available: 2 Insufficient cpu, 2 Insufficient memory. preemption: 0/2 nodes are available: 2 No preemption victims found for incoming pod.
    n
    7 replies · 2 participants
  • j

    James Phoenix

    01/10/2023, 3:50 PM
    Failed to pull image “prefecthq/prefect:2.7.7-python3.9”: rpc error: code = Unknown desc = failed to pull and unpack image “docker.io/prefecthq/prefect:2.7.7-python3.9”: failed to resolve reference “docker.io/prefecthq/prefect:2.7.7-python3.9”: failed to do request: Head “https://registry-1.docker.io/v2/prefecthq/prefect/manifests/2.7.7-python3.9”: dial tcp 3.216.34.172:443: i/o timeout
    m
    2 replies · 2 participants
  • i

    Idan

    01/10/2023, 3:50 PM
    We run into a weird edge case and I'm wondering what would be the Prefect way around this 🙂 We have a mounted drive where only legitimate users can write to, so in one flow, one task calls
    os.seteuid(priv_user_id)
    (and then later resets to
    original_user_id
    ). This is run in a container, so the
    original_user_id
    is
    root
    , which entails
    PREFECT_HOME
    is
    /root/.prefect/
    . Our tasks are long-running, so we also cache them. Then, every now and then, a task succeeds, but fails with:
    Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
      File "/usr/lib/python3.10/pathlib.py", line 1175, in mkdir
        self._accessor.mkdir(self, mode)
    PermissionError: [Errno 13] Permission denied: '/root/.prefect/storage'
    
    During handling of the above exception, another exception occurred:
    
    PermissionError: [Errno 13] Permission denied: '/root/.prefect/storage'
    Any smart ideas how to facilitate both needs?
    m
    31 replies · 2 participants
  • j

    James Phoenix

    01/10/2023, 3:50 PM
    Any ideas?
  • p

    Paco Ibañez

    01/10/2023, 5:54 PM
    Hello! Are there any plans to include flows return annotations to the deployment info same as of input parameters in
    parameter_openapi_schema
    ✅ 1
    m
    8 replies · 2 participants
  • j

    Jean-Michel Provencher

    01/10/2023, 6:38 PM
    Hello, out of curiosity, was there any changes related to shutdown signals for Flow runs in the last few releases? Unexpected exceptions that crashes flows are not longer sending exceptions to my Sentry integrations and I'm trying to understand why. When enabling Sentry debug logs, I see that the exception does not have time to be sent when the pod shuts down, and it was working 4 days ago.
    m
    30 replies · 2 participants
  • r

    Ryan Peden

    01/10/2023, 7:09 PM
    Does you agent log any error messages to the console when this happens?
    g
    3 replies · 2 participants
  • j

    jack

    01/10/2023, 8:47 PM
    At runtime how do we access artifacts stored in S3 block storage? (Prefect v2) Are all artifacts copied to some location when the flow run begins?
    r
    4 replies · 2 participants
  • a

    apolisskaya

    01/10/2023, 10:10 PM
    Hi there! I have a flow which has been failing at an increasing frequency recently. The actual task logic is always completed, but then I get
    Finished in state Failed('Flow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.\n')
    at the end. Manually retrying the flow succeeds, but I do have
    retries=3
    in the
    flow
    decorator and the flow doesn't appear to be retried
    m
    a
    5 replies · 3 participants
  • i

    Ilya Galperin

    01/11/2023, 12:32 AM
    Hi all - we’re seeing sporadic
    BrokenPipeError: [Errno 32] Broken pipe
    crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of them will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread.
    t
    2 replies · 2 participants
  • a

    Aaron Goebel

    01/11/2023, 1:40 AM
    I am using the
    create_flow_run_from_deployment
    /
    run_deployment
    APIs to compose my deployments on http request. E.g. I get a payload defining a set of flows to chain together, that triggers a parent container flow, and that container flow does the orchestration of tying things together. The way I'm thinking of running it, because there are dependencies, is to have the
    run_deployment
    code wrapped in a task like this :
    @task
    async def run_deployment(depl_id: str, parameters: dict):
        async with prefect.context(**prefect.context.run_params):
            async with prefect.Client() as client:
                run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
                run_state = await run.get_state()
                return run_state.result
    and then in an orchestration flow I think I want to do something like this where I create these tasks for each deployment.
    @flow
    async def container_flow(flow_graph: dict):
        results = {}
    
        ordered_flows = order(flow_graph)
    
        # create tasks
        tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
                 for name, flow_params in flow_graph.items()}
    
        # set dependencies
        for flow_name, flow_params in flow_graph.items():
            for dependency in flow_params.get('dependencies', []):
                tasks[flow_name].set_upstream(tasks[dependency], flow=True)
    
        # run tasks concurrently
        flow_results = await prefect.engine.run(tasks, return_tasks=True)
    
        # store results
        for flow_name, flow_result in flow_results.items():
            results[flow_name] = flow_result.result
        return results
    1 reply · 1 participant
  • a

    Aaron Goebel

    01/11/2023, 2:20 AM
    I am trying to use the
    run_deployment
    orchestrator pattern to chain together deployments dynamically. Some of these deployments depend on others. I've attempted to finagle a way around this by wrapping
    run_deployment
    in a task as such:
    @task
    async def run_deployment(depl_id: str, parameters: dict):
        async with prefect.context(**prefect.context.run_params):
            async with prefect.Client() as client:
                run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
                run_state = await run.get_state()
                return run_state.result
    I then create tasks as such
    tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
                 for name, flow_params in graph.items()}
    and set their dependencies:
    # set dependencies
        for flow_name, flow_params in graph.items():
            for dependency in flow_params.get('dependencies', []):
                tasks[flow_name].set_upstream(tasks[dependency], flow=True)
    the goal here is to dogfood the taskrunner to chain these together and kick off the flow with something like
    await task_runner.submit(tasks)
    Two issues I see with this though: 1. Some of the
    parameters
    to downstream
    create_deployment
    runs are derived from upstream deployment runs. I wonder if anyone knows an elegant way of doing that? 2.
    await task_runner.submit(tasks)
    would actually work as anticipated?
  • w

    wonsun

    01/11/2023, 6:43 AM
    Hi experts~! I trying to migration prefect 1.x to orion and somebody already are migrated. Let me introduce my condition ; • successfully set the secret block at orion server and use the secret by using get method untill last week • the secret contains the database connection information • last weekend, our office had been fixed the whole wetwork • then edit the secret block's value (changed connecting IP)
  • w

    wonsun

    01/11/2023, 7:22 AM
    Hi experts~! I trying to migration prefect 1.x to 2.x orion and a issue after office network re-construction. My conditions ; • prefect version == 2.6.7 • Untill last week successfully set the secret block at orion server and use that by using .get method. • The secret contains the database connection information. • Last weekend, our office had been fixed the whole wetwork. • Then i edited the secret block's('3f') value as changed the new IP address.
    RuntimeWarning: coroutine 'Block.load' was never awaited
      secret_block = Secret.load('3f')
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    • So I make a new secret block and that's also don't working with below error message..
    C:\Users\user\AppData\Local\Temp\ipykernel_12716\3323813457.py:3: RuntimeWarning: coroutine 'Block.load' was never awaited
      secret_block = Secret.load("new-net")
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    Cell In [27], line 6
          3 secret_block = Secret.load("new-net")
          5 # Access the stored secret
    ----> 6 secret_block.get()
    
    AttributeError: 'coroutine' object has no attribute 'get'
    Is there are happen to trouble by new network? Thanks for your reply.
    ✅ 1
    c
    4 replies · 2 participants
  • s

    Stéphan Taljaard

    01/11/2023, 9:18 AM
    Is there a way to rename a flow already registered with Orion/Cloud? I want to change its name and not lose run history
    ✅ 1
    😒ad-keanu: 1
    a
    6 replies · 2 participants
  • p

    PB

    01/11/2023, 9:35 AM
    Howdy. Is it possible to invoke Glue Job using Prefect?
    ✅ 1
    p
    4 replies · 2 participants
  • v

    Vadym Dytyniak

    01/11/2023, 10:01 AM
    Hi. Can you explain what does it mean?
    State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    m
    4 replies · 2 participants
  • h

    Hoàng Lâm

    01/11/2023, 10:02 AM
    I have issue with Prefect Agent Does any one have solutions? Thank you 🙌
    D:\Athena\etl>prefect agent start "work_queue_for_test"
    Agents now support multiple work queues. Instead of passing a single argument, provide work queue names with the `-q` or`--work-queue` flag: `prefect agent start -q work_queue_for_test`
    
    Starting v2.7.4 agent with ephemeral API...
    
      ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
     | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
     |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
     |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|
    
    
    Agent started! Looking for work from queue(s): work_queue_for_test...
    16:18:20.487 | INFO    | prefect.agent - Submitting flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22'
    16:18:20.604 | INFO    | prefect.infrastructure.process - Opening process 'beta736-bajor'...
    'C:\Users\Will' is not recognized as an internal or external command,
    operable program or batch file.
    16:18:20.622 | ERROR   | prefect.infrastructure.process - Process 'beta736-bajor' exited with status code: 1
    16:18:20.651 | INFO    | prefect.agent - Completed submission of flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22'
    16:18:20.668 | INFO    | prefect.agent - Reported flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22' as crashed: Flow run infrastructure exited with non-zero status code 1.
    t
    5 replies · 2 participants
  • s

    Satyasheel

    01/11/2023, 10:31 AM
    Hello People, I am looking a way to create pagerduty notification via prefect python api. Right now the only way I can create this notification is via Prefect Automations using Prefect UI. Any docs on this would be really appreciated. More context: I have created PagerDuty integration using prefect actions but I want to use this action pragmatically and send the notification to PagerDuty rather than creating prefect automation via prefect UI.
  • m

    max

    01/11/2023, 10:46 AM
    Hey prefect people! Can you tell me please is it possible to specify a schema while connecting prefect to Postgres database?
    1 reply · 1 participant
  • t

    Tim-Oliver

    01/11/2023, 12:39 PM
    Hi, Is there a difference between task-runs of a sub-flow and task-runs of a stand-alone flow? I observe that a flow-run does not use cached results if the cached results were created by the same flow executed as sub-flow.
  • k

    Kirill Egorov

    01/11/2023, 1:42 PM
    Hi, we're trying to setup Prefect1 with GreatExpectation and PostgreSQL data backend and Dask flow executor. Has anyone tried similar setup, because we're facing some challenges?
  • j

    Jason Noxon

    01/11/2023, 2:26 PM
    Hi all! Happy New Year! Question: are deployments in Prefect2 the same as registering in Prefect1?
    c
    2 replies · 2 participants
  • j

    John-Craig Borman

    01/11/2023, 2:30 PM
    Hi all, quick question on the
    prefect.deployments.run_deployment()
    function: If I deploy a flow without specifying a deployment name, how can I use
    run_deployment()
    to trigger a flow run? Would it just be
    run_deployment(name='flow_name')
    ? related docs: https://docs.prefect.io/concepts/deployments/#create-a-flow-run-in-a-python-script
    c
    2 replies · 2 participants
  • d

    David Elliott

    01/11/2023, 2:43 PM
    Hey! Question about flow cancellation & Dask workers on k8s (Prefect 2.7.7) I’ve got a Kubernetes deployed flow, which uses dask_kubernetes and spawns dask pods (scheduler + workers) to execute the tasks. When I cancel a running flow from the cloud UI, it terminates the flow run and deletes the prefect job pod, but it leaves the dask pods there just hanging - still alive, but not actually doing anything. Is that intended, and if so, any suggestions on best approach for auto-cleanup of these? At the end of a failed or successful flow run these are auto-terminated, but not when the flow is cancelled. Thanks!
    c
    10 replies · 2 participants
Powered by Linen
Title
d

David Elliott

01/11/2023, 2:43 PM
Hey! Question about flow cancellation & Dask workers on k8s (Prefect 2.7.7) I’ve got a Kubernetes deployed flow, which uses dask_kubernetes and spawns dask pods (scheduler + workers) to execute the tasks. When I cancel a running flow from the cloud UI, it terminates the flow run and deletes the prefect job pod, but it leaves the dask pods there just hanging - still alive, but not actually doing anything. Is that intended, and if so, any suggestions on best approach for auto-cleanup of these? At the end of a failed or successful flow run these are auto-terminated, but not when the flow is cancelled. Thanks!
c

Christopher Boyd

01/11/2023, 2:47 PM
Hi David, you would likely need a customer handler. The Dask scheduler should be responsible for cleaning up the dask workers, but if that’s not the case, you would need a handler to do it manually - https://distributed.dask.org/en/stable/worker.html
I can check with the team if that’s on the integration to be included natively
d

David Elliott

01/11/2023, 2:50 PM
Hmm, but the Dask scheduler is also left hanging there as well - as in, when the flow run is cancelled, I think the prefect-job pod just gets deleted without having a chance to send a signal to the dask scheduler telling it to terminate. Whereas when a flow runs to completion, the prefect-job sends the dask scheduler (and its workers) the termination signal..?
c

Christopher Boyd

01/11/2023, 3:01 PM
Let me check, when you get the cancellation, that sends it to the agent directly, which submits the cancellation to the infrastructure. There should be a ~30 second grace period before it’s forcefully killed, but I’ll check with the team
Would you be willing to file this as an issue here? https://github.com/PrefectHQ/prefect-dask
d

David Elliott

01/11/2023, 4:05 PM
Sure - have tried to summarise it here https://github.com/PrefectHQ/prefect-dask/issues/68 Let me know if it’s unclear?
👀 1
🙌 1
Like you say, it feels like the agent should allow the flow to take a cancellation action (e.g send a termination signal to the dask scheduler) but at present the flow pod just gets immediately deleted
c

Christopher Boyd

01/11/2023, 4:07 PM
that issue is fantastic and detailed
thank you
Ill raise this with the team
🙌 1
:gratitude-thank-you: 1
View count: 3