https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    max

    09/21/2022, 6:58 AM
    Hey! I want to schedule my flows with IntervalSchedule. How can I make first run right after the flow is deployed?
    ✅ 1
    s
    • 2
    • 2
  • v

    Vikas

    09/21/2022, 7:08 AM
    Hello everyone I have started using Prefect for the first time, and now I am in a dilemma whether to continue using it or not. I am developing a high performance REST API, and it has ETL functionality, so I thought Prefect would be best to orchestrate the whole workflow. Anyway, the problem is that with the same code, the response time of the API varies drastically based on I am using prefect or not. (I understand that the first run of the workflow will be very slow, but I have a problem with the subsequent invocations being very slow). I am attaching images of both scenarios here. My question is whether I am doing something wrong, or is it something I will have to live with if I decide to use Prefect?
    ✅ 1
    m
    • 2
    • 6
  • n

    Nic

    09/21/2022, 8:55 AM
    Is there a way to use a specific profile when you use cloud login? I've created the profile locally, but when using git action to login on the same key workspace, i get errors because it's waiting for input to create a profile
    ✅ 1
    m
    • 2
    • 1
  • s

    Stephen Herron

    09/21/2022, 10:53 AM
    I’m experimenting with the new ECSTask in 2.4.0, as we run prefect 1. One option in prefect 1 was the ability to pass in
    networkConfiguration
    and thus
    awsvpcConfiguration
    , to allow the flow to run under specific subnets/security groups but this doesn’t seem possible in prefect2 using this task at the moment?
    ✅ 1
    a
    • 2
    • 5
  • m

    Meet Patel

    09/21/2022, 11:22 AM
    Hello, I have ran into an issue here. When i use LocalDaskExecutor as an executor. One of my task is already using concurrent.futures (multiprocessing) module to run multiple processes. After using LocalDaskEcecutor in prefect flow i am getting AssertionError : daemonic processes are not allowed to have children. Anyone have faced this issue and how to resolve it?
    m
    • 2
    • 1
  • e

    Ebenezer Ankrah

    09/21/2022, 11:41 AM
    Hello, have a quick question. What would be the best way to set an environment variable within a flow in prefect 2.0?
    ✅ 1
    j
    • 2
    • 1
  • o

    Olivér Atanaszov

    09/21/2022, 11:50 AM
    Hi! I'm using Prefect on K8s clusters and I would need to track costs per projects and namespace. To achieve this, my plan was to add labels to the jobs. Before initializing
    KubernetesRun
    objects, I tried to modify the `metadata.labels`field, but apparently it gets overwritten by Prefect Cloud. I've also tried to modify
    annotations
    but that also didn't appear when checking
    kubectl describe pod
    . Is there a workaround or a different approach that comes to your mind?
    ✅ 1
    b
    • 2
    • 4
  • p

    Parwez Noori

    09/21/2022, 12:48 PM
    Hi, I am trying to solve a problem with different repos using prefect 2. I have a repo for prefect itself and then repos for the DS projects. Each have virtual environments setup. I have simple flows setup that: 1. Creates a yaml file for each deployment and applying a schedule for each deployment visible in prefect cloud. 2. Run the scheduled tasks as k8s jobs. However when I'm pushing deployments from the DS repo it is registered but never actually run/triggered. I have also created another storage block for DS project, but this has not solved the problem.
    k
    • 2
    • 18
  • v

    Vladislav

    09/21/2022, 12:55 PM
    Hi! New to prefect, trying to create a deployment locally with Orion UI and flow source code from my pycharm project with conda environment. But every time face the same exception ‘flow could not be retrieved from deployment’. Tried all tips from the chat, e.g. absolute path, but every time get this error + ‘script encountered an exception’. When i run the flow as a python file, everything works fine. I think there is a problem somewhere with deployment
    ✅ 2
    j
    • 2
    • 11
  • m

    Meet Patel

    09/21/2022, 2:02 PM
    Reposting again : - Hello, I have ran into an issue here. When i use LocalDaskExecutor as an executor. One of my task is already using concurrent.futures (multiprocessing) module to run multiple processes. After using LocalDaskEcecutor in prefect flow i am getting AssertionError : daemonic processes are not allowed to have children. Anyone have faced this issue and how to resolve it?
    n
    • 2
    • 1
  • b

    Blake Stefansen

    09/21/2022, 3:54 PM
    Hey y'all I have flow runs created from a deployment in the cloud. Our agent is running on kubernetes. I create a flow run from my deployment, and the agent seems to be picking it up (the flow run doesn't show in the work queue) However, the flow runs are stuck on "pending". I'm not exactly sure what would cause this. Some suggestions in this channel said to nuke the queue and recreate it. No luck there. Any ideas?
    t
    n
    • 3
    • 6
  • n

    Nathaniel Russell

    09/21/2022, 4:08 PM
    I have been struggling to get past this error
    File "C:\work\testing_service\sand02-cumulus-pipeline\service.py", line 7, in <module>
        from prefect.client import get_client
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\__init__.py", line 24, in <module>
        from prefect.orion.schemas.states import State
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\__init__.py", line 1, in <module>
        from . import states, schedules, data, core, sorting, filters, responses, actions
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\states.py", line 14, in <module>
        from prefect.orion.schemas.data import DataDocument
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\data.py", line 8, in <module>
        from prefect.orion.utilities.schemas import PrefectBaseModel
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\utilities\schemas.py", line 12, in <module>
        import orjson
      File "C:\work\testing_service\sand02-cumulus-pipeline\orjson\__init__.py", line 1, in <module>
        from .orjson import *
    ModuleNotFoundError: No module named 'orjson.orjson'
    The extra weird thing is if I start python in anaconda prompt and simply try to import get_client or orjson it doesn't error, but running a python file it does.
    2️⃣ 1
    j
    • 2
    • 3
  • m

    Mohamed Alaa

    09/21/2022, 4:20 PM
    Hello everyone, I'm trying to define my custom block as a PoC, however, i am not able to access it using the snippet given in Prefect UI after registering the Block. I tried importing it using
    from prefect.blocks.core import Block 
    cube = Block.load('Cube/cube-test')
    where Cube is the name of the registered block and cube-test is an instance of it. i would always get
    ValueError: Unable to find block document named cube-test for block type Cube
    I ran this code to find all the registered blocks
    from prefect.utilities.dispatch import lookup_type, get_registry_for_type
    from prefect.blocks.core import Block
    registry = get_registry_for_type(Block)
    print(registry)
    And it results in all the default prefect block without my custom blocks Am I missing something? Thanks in advance
    ✅ 1
    j
    • 2
    • 4
  • m

    Max

    09/21/2022, 6:00 PM
    Hi, I'm wondering if there's a simple solution to updating flows in prefect? For example, I have a flow script, I create the manifest of the flow, and then apply the deployment to prefect server. If for some reason I have to update that specific flows script, how do I go about letting prefect know that there's an update for the flow I previously deployed? Can prefect pull updates from github or any other third party service? Do I have to remove the flow and re-do the manifest and apply a new deployment? I'm just looking to understand if this type of functionality currently exists or if someone can point me to the right documentation when it comes to this type of functionality.
    ✅ 1
    k
    a
    • 3
    • 7
  • s

    Sam Garvis

    09/21/2022, 6:56 PM
    Is there a best method for running a flow as frequently as possible? I tried putting the flow in while true, like I read in a prefect article. But every single time, after a few hours I get some internal 500 error and it shuts down the flow. I would prefer to not run an entirely new flow frequently because it takes more time to set up the flow, but this might be the best option.
    a
    c
    f
    • 4
    • 8
  • a

    Adam Green

    09/22/2022, 1:40 AM
    Hello everyone - does anyone know if it is possible to use LocalStack as a Remote Storage block?
    ✅ 1
    a
    • 2
    • 4
  • s

    Seth Coussens

    09/22/2022, 2:05 AM
    Anyone else having issue with the Prefect 2.0 Cloud UI at the moment? Seems to be having troubles. Keeps freezing in the browser for me, but no other websites having issues.
    ✅ 1
    • 1
    • 1
  • d

    Deepanshu Aggarwal

    09/22/2022, 5:36 AM
    Hi! i'm new to prefect. i was confused about 2 things 1. can we trigger prefect flows from another service like a lambda ? 2. i was trying to run some cli commands (like prefect auth, register, agent) in github actions and facing issues where it doesnt identify those commands. is there any suggested method to do so ?
    ✅ 1
    j
    • 2
    • 5
  • a

    ale

    09/22/2022, 7:10 AM
    Shouldn’t the Github badge refer to Prefect 2 version instead of Prefect 1? The screenshot is taken from the Prefect Collection Catalog page.
    ✅ 1
    j
    • 2
    • 1
  • e

    Erik

    09/22/2022, 7:48 AM
    I wanted to post this bug here before I post in Github, maybe I’m missing something easy to fix. I created an email block in the UI. Let’s call it
    Email/test
    I have tried two different methods to access the block in my code. Method 1:
    from prefect.blocks.notifications import NotificationBlock
    
    email = NotificationBlock().load('Email/test')
    and I get the following error
    File PYTHON/site-packages/prefect/blocks/core.py:836, in Block.__new__(cls, **kwargs)
        834     return m
        835 else:
    --> 836     m = super().__new__(cls)
        837     object.__setattr__(m, "__dict__", kwargs)
        838     object.__setattr__(m, "__fields_set__", set(kwargs.keys()))
    
    TypeError: Can't instantiate abstract class NotificationBlock with abstract method notify
    It turns out this error comes from any attempt to instantiate
    NotificationBlock()
    . Can anyone else confirm this error when instantiating? Method 2:
    from prefect.blocks.core import Block
    
    email = Block().load('Email/test')
    and I get the following error
    File PYTHON/site-packages/prefect/blocks/core.py:646, in Block.load(cls, name)
        642         block_document = await client.read_block_document_by_name(
        643             name=block_document_name, block_type_slug=block_type_slug
        644         )
        645     except prefect.exceptions.ObjectNotFound as e:
    --> 646         raise ValueError(
        647             f"Unable to find block document named {block_document_name} for block type {block_type_slug}"
        648         ) from e
        649 return cls._from_block_document(block_document)
    
    ValueError: Unable to find block document named test for block type Email
    even though the block has def been created in the UI. Have I tried the right things? Is there some other way to access my Email block? Or is this a real bug? PS (I see Mohamed Alaa write about a similar bug in this thread a few hours ago)
    ✅ 1
    m
    j
    • 3
    • 13
  • s

    Steve Smith

    09/22/2022, 1:34 PM
    I'm having trouble running a KubernetesJob deployment with Docker storage when I apply the deployment on Windows. The deployment's entrypoint is getting set with a Windows-style path (e.g.
    entrypoint="app\\flows\\my_flow.py:my_flow"
    ), so when I try to run it the job cannot find the flow source code. My fix is to manually replace "\\" with "/" in the entrypoint to make it a Linux-style path before applying the deployment. Is this a known issue?
    r
    b
    +2
    • 5
    • 9
  • j

    Jan Domanski

    09/22/2022, 2:18 PM
    hi there, having a little trouble logging into prefect cloud with an API key in a non-interactive fashion. What i did was this
    prefect cloud login --workspace my-account/my-workspace --key ....
    Creating a profile for this Prefect Cloud login. Please specify a profile name: default
    Profile 'default' already exists.
    I haven’t expected to be prompted about anything, it’s impossible for me to respond to the prompt. Is there another way to provide the key? An ENV var? Is there a way to “purge” the prefect config to be able to login without the prompt complaining
    c
    c
    • 3
    • 8
  • c

    Ching

    09/22/2022, 3:44 PM
    What's the recommended way to pass pandas DFs between tasks? With big DFs (1GB+) serialization is taking a lot of time.
    👀 1
    ✅ 1
    a
    j
    j
    • 4
    • 8
  • d

    David Cupp

    09/22/2022, 3:45 PM
    Hello, is it possible to make the prefect CLI (I have version
    2.4.0
    ) output text without the fancy borders and column headers? I would like to pass the output through awk, e.g.
    prefect deployment ls | awk '(extract deployment id)' | xargs -n 1 (some command)
    ✅ 1
    m
    j
    • 3
    • 6
  • n

    Nathaniel Russell

    09/22/2022, 4:33 PM
    prefect.client.get_client imports orjson and I am having a hard time putting that in a lambda. This is the file structure I am looking at, this will eventually be all zipped up and put in an AWS lambda.
    root
    ├── orjson
    |   ├── __pycache__
    |   ├── __init__.py
    |   ├── __init__.pyi
    |   ├── orjson.cp39-win_amd64.pyd
    |   └── py.typed
    └── service.py
    In service.py I am trying to import import prefect.client that then imports orjson
    import orjson
    This is what init.py looks like
    from .orjson import *
    
    __doc__ = orjson.__doc__
    if hasattr(orjson, "__all__"):
        __all__ = orjson.__all__
    however importing orjson throws this error:
    File "C:\work\testing_service\sand02-cumulus-pipeline\service.py", line 7, in <module>
        from prefect.client import get_client
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\__init__.py", line 24, in <module>
        from prefect.orion.schemas.states import State
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\__init__.py", line 1, in <module>
        from . import states, schedules, data, core, sorting, filters, responses, actions
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\states.py", line 14, in <module>
        from prefect.orion.schemas.data import DataDocument
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\schemas\data.py", line 8, in <module>
        from prefect.orion.utilities.schemas import PrefectBaseModel
      File "C:\work\testing_service\sand02-cumulus-pipeline\prefect\orion\utilities\schemas.py", line 12, in <module>
        import orjson
      File "C:\work\testing_service\sand02-cumulus-pipeline\orjson\__init__.py", line 1, in <module>
        from .orjson import *
    ModuleNotFoundError: No module named 'orjson.orjson'
    I have no clue why this import doesn't work because if I simply have orjson in an anaconda environment and import it it works fine. Here is init.pyi if that is of any use:
    import json
    from typing import Any, Callable, Optional, Union
    
    __version__: str
    
    def dumps(
        __obj: Any,
        default: Optional[Callable[[Any], Any]] = ...,
        option: Optional[int] = ...,
    ) -> bytes: ...
    def loads(__obj: Union[bytes, bytearray, memoryview, str]) -> Any: ...
    
    class JSONDecodeError(json.JSONDecodeError): ...
    class JSONEncodeError(TypeError): ...
    
    OPT_APPEND_NEWLINE: int
    OPT_INDENT_2: int
    OPT_NAIVE_UTC: int
    OPT_NON_STR_KEYS: int
    OPT_OMIT_MICROSECONDS: int
    OPT_PASSTHROUGH_DATACLASS: int
    OPT_PASSTHROUGH_DATETIME: int
    OPT_PASSTHROUGH_SUBCLASS: int
    OPT_SERIALIZE_DATACLASS: int
    OPT_SERIALIZE_NUMPY: int
    OPT_SERIALIZE_UUID: int
    OPT_SORT_KEYS: int
    OPT_STRICT_INTEGER: int
    OPT_UTC_Z: int
    b
    • 2
    • 3
  • c

    Clovis

    09/22/2022, 4:33 PM
    Hi guys ! I got some questions regarding naming conventions and best practices: How do you name your deployments ? By this I mean: My flow names usually indicate the flow main job, and I was thinking naively to name my deployment by the stage of the flow (PROD, DEV, etc.). However, looking at the UI (in the
    Flow Runs
    dashboards deployment filter for instance), I see that deployments are not grouped by similar name but by associated flow apparently. So I think I may misunderstood the prefect deployment philosophy and wanted to ask you all if there is any literature or documentation about the naming convention in prefect 😛refect: ?
    ✅ 1
    m
    • 2
    • 1
  • x

    Xavier Babu

    09/22/2022, 5:04 PM
    Dear Prefect Community, I am getting the following error in agent.log (log for the dedicated agent) and I can't run the scheduled job. I already have a local storage system and used the code to load it inside my flow. prepared_infrastructure = infrastructure_block.prepare_for_flow_run(flow_run) AttributeError: 'LocalFileSystem' object has no attribute 'prepare_for_flow_run' An exception occurred.
    m
    • 2
    • 6
  • b

    Ben Ayers-Glassey

    09/22/2022, 5:17 PM
    Hello all! Here are two task runs I want to see: https://cloud.prefect.io/zesty-ai/task-run/fe3de56d-505c-481a-b16e-e067189b6801 https://cloud.prefect.io/zesty-ai/task-run/30ec6159-25c4-4ed1-8215-7e3827014c0a ...but in both cases, I see a mostly-blank page, and some JS errors (see screenshot). Any ideas? Is this a known issue?
    m
    • 2
    • 5
  • a

    Aaron Goebel

    09/22/2022, 5:19 PM
    Has anyone used the latest DockerContainer blocks with ECR? I currently am using an ECRRegistry class that inherits
    BaseDockerLogin
    and defines login as grabbing a token and calling login with the parsed out username, password etc. I pass the registry to the
    DockerContainer
    infrastructure and save the blocks. When I create the blocks (registry and container), my prefect agents are initially able to execute deployments and pull the latest containers from ECR. But after 12 hours or so (I assume token expiration), all deployments to the agent :
    docker.errors.APIError: 500 Server Error for <http+docker://localhost/v1.41/auth>: Internal Server Error ("login attempt to https://{My_ECR_Registry}/v2/ failed with status: 403 Forbidden
    I'm kind of confused why it works right after creating a block and deployment, but seems like reauthentication does not
    :upvote: 1
  • q

    Q

    09/22/2022, 5:32 PM
    👋 I'm trying to use RemoteFileSystem block with WebHDFS. I'm running into 2 problems: 1. When I call
    RemoteFileSystem(basepath="<webhdfs://basepath>", settings=params).write_path(path="subpath/filename", content=content)
    I get an exception like this:
    null for uri: <https://hdfs.example.com>:port/webhdfs/v1webhdfs%3A//basepath/subpath?op=MKDIRS
    Which is unsurpring, since URL above is clearly not valid. The problem here is that `fsspec.WebHDFS`'s methods makedirs/mkdir expect a slash-prefixed path w/o scheme (in this case
    /basepath/subpath
    ), while
    prefect.RemoteFileSystem
    just passes
    <scheme://basepath/subpathdir>
    and expects fsspec to do the rest. 2. A lot of fsspec.AbstractFileSystem methods rely on fsspec.infer_storage_options, most importantly
    open
    , which is used by
    prefect.RemoteFileSystem.write_path
    and
    prefect.RemoteFileSystem.read_path
    .
    infer_storage_options
    calls urlsplit and returns
    urlsplit(path).path
    , i.e. strips away scheme+netloc. Here the problem is that netloc is whatever comes after the scheme and before the first slash, e.g. for
    <webhdfs://home/user/project>
    home
    would get stripped away erroneosly. This means one would have to prepend an extra segment to basepath to be sacrificed to fsspec's implementation (e.g.
    <webdhfs://thisgetslost/home/user/project>
    ). I can subclass
    RemoteFileSystem
    and override
    write_path
    to solve both problems. But then I would, it seems, need to make this class definition available and import it before calling
    Block.load
    . What I don't understand is how I can make an agent use this to fetch code. Right now it just fails with
    KeyError: "No class found for dispatch key 'subclassname' in registry for type 'Block'."
    . Any ideas? Maybe there is a better way of doing this that doesn't involve subclassing?
    ✅ 1
    m
    • 2
    • 4
Powered by Linen
Title
q

Q

09/22/2022, 5:32 PM
👋 I'm trying to use RemoteFileSystem block with WebHDFS. I'm running into 2 problems: 1. When I call
RemoteFileSystem(basepath="<webhdfs://basepath>", settings=params).write_path(path="subpath/filename", content=content)
I get an exception like this:
null for uri: <https://hdfs.example.com>:port/webhdfs/v1webhdfs%3A//basepath/subpath?op=MKDIRS
Which is unsurpring, since URL above is clearly not valid. The problem here is that `fsspec.WebHDFS`'s methods makedirs/mkdir expect a slash-prefixed path w/o scheme (in this case
/basepath/subpath
), while
prefect.RemoteFileSystem
just passes
<scheme://basepath/subpathdir>
and expects fsspec to do the rest. 2. A lot of fsspec.AbstractFileSystem methods rely on fsspec.infer_storage_options, most importantly
open
, which is used by
prefect.RemoteFileSystem.write_path
and
prefect.RemoteFileSystem.read_path
.
infer_storage_options
calls urlsplit and returns
urlsplit(path).path
, i.e. strips away scheme+netloc. Here the problem is that netloc is whatever comes after the scheme and before the first slash, e.g. for
<webhdfs://home/user/project>
home
would get stripped away erroneosly. This means one would have to prepend an extra segment to basepath to be sacrificed to fsspec's implementation (e.g.
<webdhfs://thisgetslost/home/user/project>
). I can subclass
RemoteFileSystem
and override
write_path
to solve both problems. But then I would, it seems, need to make this class definition available and import it before calling
Block.load
. What I don't understand is how I can make an agent use this to fetch code. Right now it just fails with
KeyError: "No class found for dispatch key 'subclassname' in registry for type 'Block'."
. Any ideas? Maybe there is a better way of doing this that doesn't involve subclassing?
✅ 1
m

Michael Adkins

09/22/2022, 5:39 PM
Hi! I’d welcome an issue / pull request to address the parsing issues with the remote file system. It seems a bit tricky though. You need the block to be imported to be usable. One way is to register your module as a plugin e.g. https://github.com/PrefectHQ/prefect-aws/blob/main/setup.py#L30-L31
A little background on that is available at https://github.com/PrefectHQ/prefect/blob/main/src/prefect/plugins.py
q

Q

09/22/2022, 5:54 PM
Alright, I'll try it out and get back to you, thanks! Will create an issue when I'm done, hopefully will have a better understanding.
Adding an entrypoint worked. Created an issue: https://github.com/PrefectHQ/prefect/issues/6957
View count: 3