https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kalise Richmond

    10/20/2022, 4:47 PM
    Marvin at DBT Coalesce this morning in #prefect-dbt šŸ˜›refect-duck:https://prefect-community.slack.com/archives/C0470L5UV2M/p1666274979508959
    :prefect-duck: 5
    :marvin-duck: 7
    šŸš€ 4
    :party-parrot: 2
    āœ… 3
    :dbt: 7
  • a

    Alix Cook

    10/20/2022, 4:52 PM
    Am I missing something in the prefect REST API documentation? How do I authenticate with the API? https://docs.prefect.io/api-ref/rest-api/
    āœ… 1
    a
    • 2
    • 2
  • s

    Stefan Rasmussen

    10/20/2022, 5:00 PM
    In all my task runs, the orion ui shows Task Run ID to be identical to Flow Run ID. Am I missing something here or is that a bug? Prefect 2.6.3, local installation.
    šŸ‘€ 1
    āœ… 1
    j
    • 2
    • 2
  • j

    John Ramey

    10/20/2022, 5:05 PM
    must
    get_run_logger
    be called in each task/flow? i have a flow with ~20 tasks. and after upgrading to Prefect 2, I have 20 different calls to
    get_run_logger
    — is there a better way to adhere to DRY?
    āœ… 1
    r
    a
    +3
    • 6
    • 12
  • i

    Iuliia Volkova

    10/20/2022, 7:41 PM
    Question about Deployments & UI, everything worked till version 2.2.* but after 2.3.* Deployments are not showed in UI. In same time - if check Networking - API answer with the list of deployments (see the screen in the thread)
    āœ… 1
    j
    j
    a
    • 4
    • 7
  • b

    Ben Muller

    10/20/2022, 10:30 PM
    G'day - prefect 1.0 question. If I have a "master flow" and I trigger a "subflow" with
    create_flow_run
    if that subflow then triggers more "subsubflows" if one of them fail, will my master flow know about it ? is there a way to propagate any of these signales?
    āœ… 1
    m
    • 2
    • 3
  • t

    TomƔs Emilio Silva Ebensperger

    10/21/2022, 12:14 AM
    Has anyone seen this errot with prefect cloud? I have no issues runnign the flows locally but when i trigger them in the cloud i get
    result = self.result.write(value, **formatting_kwargs)
    TypeError: LocalResult.write() got multiple values for argument 'self'
    āœ… 1
    a
    • 2
    • 4
  • d

    Deepanshu Aggarwal

    10/21/2022, 5:17 AM
    if after running a concurrent task run i want to run another task ( after completion of previous concurrent task run) with the output of 1st task in concurrent task run. Is it possible ?
    āœ… 1
    a
    • 2
    • 11
  • n

    Nic

    10/21/2022, 8:37 AM
    When deploying through the CLI with --cron command.. Is there a way to add the timezone argument to this as there is when creating a deploy via python?
    āœ… 1
    :thank-you: 1
    šŸ‘ 1
    a
    • 2
    • 12
  • r

    redsquare

    10/21/2022, 9:19 AM
    Hi - Is there any way I can upgrade my prefect 2 account to an Org easily and carry over the workspaces/users
    āœ… 1
    a
    • 2
    • 2
  • r

    Robin Weiß

    10/21/2022, 9:33 AM
    Hey there! I have a really weird issue with Kubernetes Jobs that I can’t wrap my head around: I try to start around 150 flows from on orchestrator flow. I do this because I need to parallelize across K8s pods instead of threads. This works, but only around 50 of my Flow-Runs actually transition into
    Running
    state and start a K8s job & pod, the other 100 Flow Runs just idle around in ā€œPendingā€. I have searched everywhere to find hints about why they won’t start. There is definitely enough K8s computing resources for them to be scheduled, there is no concurrency limits set via tags or on the work queue directly. The K8s work queue just shows them as ā€œPendingā€ in the UI. Does anyone have any idea where else to look? Cheers šŸ™‚
    āœ… 1
    r
    c
    • 3
    • 34
  • a

    Alejandro

    10/21/2022, 1:23 PM
    Hi all! I am trying to generate a manifest YAML file using the
    prefect deployment build
    CLI. As for the block options, I have defined the following custom
    LocalFileSystem
    block (shown in the image below). However, when building the manifest file with the command
    prefect deployment build src/flows/example.py:healthcheck --name test --storage-block local-file-system/mount
    , I get the following error:
    Found flow 'healthcheck'
    Traceback (most recent call last):
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
        return fn(*args, **kwargs)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
        return future.result()
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/deployment.py", line 853, in build
        deployment = await Deployment.build_from_flow(
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 720, in build_from_flow
        await deployment.upload_to_storage()
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 572, in upload_to_storage
        file_count = await self.storage.put_directory(
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/filesystems.py", line 187, in put_directory
        shutil.copytree(
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 568, in copytree
        return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 467, in _copytree
        os.makedirs(dst, exist_ok=dirs_exist_ok)
      File "/home/user/Miniconda3/envs/prefect/lib/python3.9/os.py", line 225, in makedirs
        mkdir(name, mode)
    FileNotFoundError: [Errno 2] No such file or directory: ''
    An exception occurred.
    Any idea why this is happening? I'm currently using
    python=3.9.13
    and
    prefect=2.6.4
    You can find the
    healthcheck
    flow code in the following link: https://discourse.prefect.io/t/how-to-deploy-prefect-2-0-flows-to-run-as-a-local-process-docker-container-or-a-kubernetes-job/1246
    j
    • 2
    • 2
  • b

    Brandon T. Kowalski

    10/21/2022, 1:42 PM
    In the Prefect 2.0 Cloud UI, where can I change my email address? When I goto the profile editing it is grayed out.
    āœ… 1
    b
    • 2
    • 1
  • b

    Brandon T. Kowalski

    10/21/2022, 2:15 PM
    Follow up question. Now that self-service organizations are available in Prefect Cloud 2.0, is it possible to convert an existing workspace into an org workspace?
    āœ… 1
    a
    • 2
    • 1
  • a

    Alejandro

    10/21/2022, 3:10 PM
    Hello! Is there any way in Prefect to use a mount point ("/home/user/mount/central-storage") as remote storage block? I have tried to set it to 'basepath' of LocalFileSystem, but this way I cannot run deployments on remote agents. Is there a "hacky" way to use RemoteFileSystem for this purpose?
    āœ… 1
    m
    t
    • 3
    • 17
  • k

    Krishnan Chandra

    10/21/2022, 3:39 PM
    Hey everyone - I’m trying to figure out what would be a good architecture on AWS for running a large number of flows in a burst (ex: X00,000 flows run once a week, broken into smaller batches). Ideally I’d want the backing infrastructure that these flows run on to be ephemeral, so it seems like I could use any of the following to do this: • Spinning up more agents temporarily (current plan) • Kubernetes jobs • ECS (?) • Dask + Fargate (?) ā—¦ I know Dask parallelism operates at task level rather than flow level I’m wondering if anyone here has similar use cases. If so, what works well for you?
    āœ… 1
    a
    j
    • 3
    • 7
  • s

    Seth Goodman

    10/21/2022, 3:50 PM
    Hi All - hopefully an easy question: What is the proper way to redefine an existing task? E.g., In interactive mode I create my_task, then edit my_task, and I get an error of "OSError: could not get source code". Using 2.6.4
    āœ… 1
    j
    r
    • 3
    • 11
  • a

    Amy McClurg

    10/21/2022, 4:02 PM
    Hi, I was creating a new deployment earlier and tried to use the '--skip-upload' option in building my deployment, but got an error that it was not an available option. It's listed as an option to use in the documentation but doesn't show up when checking through 'prefect deployment build --help' in terminal. Has this feature been removed?
    āœ… 1
    c
    • 2
    • 9
  • t

    Trevor Kramer

    10/21/2022, 6:07 PM
    What is the recommended way to test flows that reference external blocks? For instance this test fails because result_storage is set to a block name.
    from prefect import flow, get_run_logger, task
    from prefect.testing.utilities import prefect_test_harness
    
    
    def test_flow():
        @task
        def _add(x: int, y: int) -> int:
            get_run_logger().info(f"Adding {x} + {y} in task")
            return x + y
    
        @flow(persist_result=True, result_storage="s3/mldc-result-storage", result_serializer="json")
        def add(x: int, y: int) -> int:
            get_run_logger().info(f"Adding {x} + {y}")
            result = _add(x, y)
            return result
    
        with prefect_test_harness():
            assert add(5, 6) == 11
    āœ… 1
    r
    • 2
    • 1
  • i

    Ilya Galperin

    10/21/2022, 6:22 PM
    Hey folks. Is there a best practice for conditionally triggering a deployment for a flow, from another flow in Prefect 2 i.e.
    flow a deployment x
    triggering
    flow b deployment y
    ?
    āœ… 1
    a
    • 2
    • 2
  • j

    Jai P

    10/21/2022, 8:12 PM
    šŸ‘‹ hullo! we're thinking through our infrastructure story with prefect right now, and are aiming to deploy to EKS. a question i have is, is there a standard model of # of agents to # of nodes in a cluster? Or should you be having a single agent per flow that you want to execute in the cluster? Also apologies in advance if there's already a recipe or something i should be looking at šŸ™‚
    āœ… 1
    c
    • 2
    • 1
  • s

    Stephen Herron

    10/21/2022, 10:09 PM
    hi - I am hitting an error in prefect-aws, get_directory
    āœ… 1
    n
    • 2
    • 9
  • j

    Jarvis Stubblefield

    10/22/2022, 12:36 AM
    @Anthony Head @Ryan Peden @Michael Adkins @Andrew Brookins @Nate… Just wanted to say a big šŸŽ‰THANK YOUšŸŽ‰ to each of you and others who have helped me. My client loves the idea of us replacing Celery with Prefect and was impressed with what we had working and how we can leverage Prefect in the future for more traditional and non-traditional workflow solutions!
    šŸ’™ 5
    :thank-you: 6
    āœ… 2
    šŸ‘ 6
    šŸ”„ 6
    :marvin-duck: 5
    šŸš€ 9
  • n

    Noam Cohen

    10/22/2022, 8:55 AM
    [SOLVED] Hi, I am getting 404 when trying to enter my workspace in cloud (never happened to me before). I didn’t do anything different in changing settings. Can you help me? Update: According to this https://prefect-community.slack.com/archives/CL09KU1K7/p1666181421767899?thread_ts=1666168584.461719&cid=CL09KU1K7
    āœ… 1
    :gratitude-thank-you: 1
  • n

    Noam Cohen

    10/22/2022, 11:17 AM
    Another one - my deployed flow cannot retrieve environmental variables from my local .env file. The file is in
    .prefectignore
    when I upload to my storage block and I am using a local agent. I am pretty sure that in the past this worked, but anyway - How can I set the environmental variables of my execution from an .
    env
    file?
    āœ… 1
    a
    • 2
    • 6
  • m

    Manuel Garrido PeƱa

    10/22/2022, 3:13 PM
    hey guys, I am following this docker-compose setup for local development https://github.com/fraibacas/prefect-orion, but for some reason, even though
    s3fs
    is installed, I cant add any kind of block in the UI, do you think the issue is related to docker?
    āœ… 1
    a
    • 2
    • 33
  • k

    Keith

    10/22/2022, 4:58 PM
    Hi everyone! Have been having a lot of success with migrating Prefect 1 to Prefect 2 on GKE. It has been a great experience!! I'm not sure this is the correct place for feedback but there are 2 situations that would be great to have a better view on: 1. When the underlying Kubernetes pod runs out of/over resource requirements the Pod gets killed by the GKE cluster but the job in Prefect Cloud remains in the state of
    running
    . Nothing is ever messaged back to the logs and it never
    fails
    , some sort of response or indication that the pod is no longer active would be great! 2. When you run a job and it never makes it to the actual Flow, the failure comes through in the Prefect Cloud UI but the kubernetes Pod remains active. I'm assuming this is due to the
    finished_job_ttl
    not getting applied yet, but it is set at the deployment level so not sure why it keeps the pod around on a failure like this.
    āœ… 1
    :gratitude-thank-you: 3
    āž• 1
    a
    • 2
    • 2
  • m

    Mtrl_Scientist

    10/22/2022, 7:59 PM
    Hi, Is it correct that one can't run a flow (even locally) if the Prefect API is down??
    āœ… 1
    a
    • 2
    • 7
  • f

    Florian Kühnlenz

    10/23/2022, 8:27 AM
    Where can I find the docs for the cli for 2.0? They seam to be missing from der API references.
    āœ… 1
    a
    q
    • 3
    • 8
  • j

    Jens Freund

    10/23/2022, 8:01 PM
    Hello all, I would like to ask if you could help me with a problem I'm having in creating a deployment with an SFTP server as remote storage? I'm using Prefect server version 2.6.4 on Windows 10. Specifically, I locally created an example project with the following structure:
    C:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
    ā”œā”€ā”€ā”€.idea
    │       some files
    │           
    ā”œā”€ā”€ā”€prefect_flows
    │       prefect_flows.py
    │       
    └───prefect_tasks
            prefect_tasks.py
    In addition, I created a remote storage block via the UI named
    sftp-server
    and the base path
    <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
    . After that, I used the following command to create a deployment:
    PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
    However, only the folder
    prefect_flows
    with the file
    prefect_flows.py
    is uploaded to the
    api_flow
    directory of the SFTP server, not the folder
    prefect_tasks
    . Also, Prefect gives the following error:
    File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
      self.value += inc
    TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
    An exception occurred.
    I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!
    āœ… 1
    q
    • 2
    • 3
Powered by Linen
Title
j

Jens Freund

10/23/2022, 8:01 PM
Hello all, I would like to ask if you could help me with a problem I'm having in creating a deployment with an SFTP server as remote storage? I'm using Prefect server version 2.6.4 on Windows 10. Specifically, I locally created an example project with the following structure:
C:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
ā”œā”€ā”€ā”€.idea
│       some files
│           
ā”œā”€ā”€ā”€prefect_flows
│       prefect_flows.py
│       
└───prefect_tasks
        prefect_tasks.py
In addition, I created a remote storage block via the UI named
sftp-server
and the base path
<sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
. After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder
prefect_flows
with the file
prefect_flows.py
is uploaded to the
api_flow
directory of the SFTP server, not the folder
prefect_tasks
. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
  self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!
āœ… 1
Traceback:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"                   
Found flow 'api-flow'
Default '.prefectignore' file written to C:\Users\Testbenutzer\prefect_test_project\.prefectignore
Traceback (most recent call last):
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\base_events.py", line 646, in run_until_complete
    return future.result()
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\deployment.py", line 853, in build
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 720, in build_from_flow
    await deployment.upload_to_storage()
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 572, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\filesystems.py", line 359, in put_directory
    self.filesystem.put_file(f, fpath, overwrite=True)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\spec.py", line 819, in put_file
    callback.relative_update(segment_len)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
    self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
Python code:
# prefect_flows.py

from prefect import flow
from prefect_tasks.prefect_tasks import call_api


@flow
def api_flow(url):
    fact_json = call_api(url)
    print(fact_json)
	
	

# prefect_tasks.py

import requests
from prefect import task


@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()
Storage block definition:
q

Q

10/24/2022, 11:27 AM
TLDR:
pip install fsspec>=2022.10.0
As you can see in the traceback whenever a file gets written to the remote the progress is tracked via a callback that adds the value returned by
AbstractFileSystem.open().write
to a counter. It is assumed that the value represents the number of bytes written to the buffer. However, in the case of
fsspec.implementations.sftp.SFTPFileSystem
the buffer (
paramiko.file.BufferedFile
) returns
None
when
write
is called, which leads to an exception when
fsspec.spec.AbstractBufferedFile
attempts to add
None
to an int counter. The offending segment changed in
fsspec==2022.10.0
(released 2022/10/19), bumping `fsspec`'s version seems to solve the problem.
:thank-you: 1
šŸ™Œ 1
j

Jens Freund

10/24/2022, 11:57 AM
Thank you so much @Q, that solved the problem! And also thank you very much for the explanation!
View count: 4