https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • i

    iKeepo w

    10/24/2022, 3:49 AM
    hello, anyway to specify the parameter info in the deployment.yaml through the CLI, seems just can be modified after
    prefect deployment build
    ?
    ✅ 1
    a
    2 replies · 2 participants
  • s

    Stephen Herron

    10/24/2022, 3:56 AM
    Does .prefectignore support negative cases? This did not seem to work
    dbt_projects/*
    !dbt_projects/.gitkeep
    ✅ 1
    a
    q
    2 replies · 3 participants
  • t

    Tim Enders

    10/24/2022, 2:08 PM
    Trying to calculate if a task has failed and I am doing it all wrong. How would I go about it? Right now I have
    isinstance(blah, Failed)
    but Failed isn't a type. Here is a code snippet:
    if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
        return Completed(message="Load Finished!")
    elif isinstance(bq_result, Failed):
        slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
        slack_webhook_block.notify("Hello from Prefect 2.0!")
        return bq_result
    else:
    ✅ 1
    r
    m
    6 replies · 3 participants
  • i

    iñigo

    10/24/2022, 4:03 PM
    Hello, Is there any guide on how you are able to prepare a blob storage in azure to work with as an storage block? thanks in advance
    ✅ 1
    r
    5 replies · 2 participants
  • How to migrate a Prefect v1 flow-level state handler to Prefect v2? Does Orion have a concept of flow state handlers? How to take action on flow run cancellation via code?
    d

    David Elliott

    10/24/2022, 4:08 PM
    Hey, I’m struggling in 2.0 without a Flow State Handler and wondering what the best approach might be to replicate my 1.0 logic for a 2.0 flow. In 1.0, I had a flow state handler which did the following pseudo logic: • if flow just started: ◦ send slack message • if flow cancelled: ◦ make post request to an endpoint which does thing A • if flow successful and some other params are true: ◦ make post request to an endpoint which does thing B • once flow finished (failed or successful) ◦ post to slack with a custom message including task state counts for failed/success etc (queried from the backend) I know there are Notifications in 2.0 for state changes, but they can’t do the custom logic I need so wondering if there are any suggestions? FWIW I really, really miss state change handers - they were incredibly powerful customisation tools 🙂 I have a feeling you’ll tell me I need to create a subflow and put all this logic in the parent flow, but hoping to avoid that as it seems overkill… hoping someone has some ideas..! 🙏
    ✅ 1
    a
    e
    +1
    11 replies · 4 participants
  • a

    Alejandro

    10/24/2022, 4:49 PM
    Is it possible to load a
    Block
    inside the class in which another
    Block
    is defined using the Python API?
    class Foo(Block):
    
        target_name: str
        _block_type_slug = "foo-block"
    
        def block_initialization(self) -> None:
            bar_block = JSON.load("bar-block").value
            another_block = JSON.load(target_name).value
    I have tried with the above code but I get the following warning, which results in a coroutine being returned instead of a block:
    RuntimeWarning: coroutine 'Block.load' was never awaited
    a
    2 replies · 2 participants
  • r

    redsquare

    10/24/2022, 5:22 PM
    Hi All, just upgraded to an Org account and reconfiguring K8's to use the new service accounts - I cant get my k8s agent to start, the agent looks to be connected fine as it writes out the api url but then 404's on getting the {url}/work_queues
    Agent started! Looking for work from queue(s): default-work-queue...Mon, Oct 24 2022 6:17:45 pm17:17:45.536 | ERROR | prefect.agent - Failed to create work queue 'default-work-queue'.Mon, Oct 24 2022 6:17:45 pm
    prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/9396b5ff-23c0-42b4-91d9-5892450xxxx/workspaces/de803906-c6f7-45%0A77-a3aa-d300b56fxxx/work_queues/>'
    👀 1
    ✅ 1
    g
    c
    12 replies · 3 participants
  • t

    Tim Enders

    10/24/2022, 6:52 PM
    This feels smelly.... can anyone suggest a better way to catch rnadom exceptions in your Prefect 2.0 flow?
    if __name__ == "__main__":
        try:
            flow_result = main()
        except Exception as e:
            slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
            slack_webhook_block.notify(f"{e.__class__}: {str(e)}")
    ✅ 1
    a
    1 reply · 2 participants
  • t

    Tim Enders

    10/24/2022, 9:13 PM
    Hmmm... how would I get the flow name while inside a flow in Prefect 2.0? Is that still something in a context somewhere?
    ✅ 1
    m
    5 replies · 2 participants
  • l

    link89

    10/25/2022, 1:19 AM
    I just come into a strange problem. I used to run
    rm -rf ~/.prefect/storage/*
    to clear cache so that the task would run again next time. But I find it won't work this time and I have to clean the whole
    ~/.prefect
    directory instead of just storage to make it work. Is this a bug or I just miss something? Does prefect provide some command line interface to clear cache?
    👀 1
    ✅ 1
    k
    8 replies · 2 participants
  • s

    Saurabh Indoria

    10/25/2022, 3:04 AM
    Hi Team, We came across this error twice in a month, and it makes us worry about the reliability of Prefect. Since Prefect is the backbone of our infrastructure, everything stops working when this happens. The retries and everything simply fails and all our flows fail until this issue persists. Our deployment: Prefect Cloud (1.0) + GCP Kubernetes + LocalDaskExecutor What can be done to safeguard against this? Is there a reliable way to retry and not fail? Task Run for Reference. CC: @Christina Lopez @Yash Joshi
    [20 Oct 2022 12:22pm]: Error during execution of task: SSLError(MaxRetryError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1131)')))"))
    m
    c
    +3
    14 replies · 6 participants
  • k

    Kishan

    10/25/2022, 3:33 AM
    I'm trying to install the vertex agent for Prefect with
    pip install prefect[gcp]
    as per the documentation but I'm getting
    WARNING: prefect 2.6.4 does not provide the extra 'gcp'
    . Maybe the docs are not up to date? Anyone know how to install the vertex agent now?
    ✅ 1
    r
    j
    6 replies · 3 participants
  • j

    Jake Loo

    10/25/2022, 3:54 AM
    Curious if anyone has used prefect with real-time data pipeline curious what's your setup like? Also, has anyone used prefect to run Spark jobs?
    ✅ 1
    a
    1 reply · 2 participants
  • j

    Jon Ruhnke

    10/25/2022, 4:11 AM
    My Prefect 2.0 cloud workspace shows up in my list, but is inaccessible with a 404 error. What do I do?
    ✅ 1
    j
    2 replies · 2 participants
  • k

    Khyaati Jindal

    10/25/2022, 5:23 AM
    Hi guys, I have been using prefect 2.0 for a while for my multiple project and I have come across this error often
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url '<https://api.prefect.cloud/api/accounts/xxxxxx>
        return asynclib.run(func, *args, **backend_options)
    Usually re - running the agent works fine but given the sensitive nature of my prjects, I would like to avoid this in future, hence i am trying to understand why is this happening
    👀 1
    j
    3 replies · 2 participants
  • v

    vholmer

    10/25/2022, 8:32 AM
    Hi, I'm running a prefect 2.5.0 agent on a VM in azure. Occasionally when not running any jobs, like in the middle of the night, the agent throws the following exception when trying to ping the prefect api:
    PrefectHTTPStatusError: Client error '403 Forbidden' for url
    '<https://api.prefect.cloud/api/accounts/><ACCOUNTGUID>/wor
    kspaces/<WORKSPACEGUID>/work_queues/<WORKQUEUEGUID>/get_runs'
    At /usr/local/lib/python3.8/dist-packages/prefect/client/base.py:125. Submitting any flows after this exception leads to them being visible in the Prefect cloud web interface as "Scheduled" or "Late", basically the agent completely dies after this. Is this a known issue or could this be due to something else I'm doing incorrectly?
    ✅ 1
    k
    r
    +1
    10 replies · 4 participants
  • EKS Prefect v1
    l

    Lukasz Pakula

    10/25/2022, 9:32 AM
    Hi, i'm running prefect 1.2.2. It was all running smoothly until i upgraded the kubernetes version (eks) from 1.21 to the latest 1.23 Now i'm randomly get the following error
    INFO - Retiring workers [154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185]
    INFO - Adaptive stop
    INFO - Adaptive stop
    ERROR - prefect.CloudFlowRunner | Unexpected error: KilledWorker('<name>', <WorkerState 'tcp://<ip>', name: 47, status: closed, memory: 0, processing: <number>, 3)
    Restarting the flow is resolving the issue. Is there any sensible explanation of why upgrading kubernetes cluster could cause it? Or i must be missing something elsewhere ?
    👀 1
    m
    a
    14 replies · 3 participants
  • s

    Stanislav Kotsiievskyi

    10/25/2022, 12:45 PM
    Hi, I'm running prefect 2.6.3. I have an agent in docker container running as ECS service and Github filesystem with private repo to store my flows. The flows are in 'flows' subfolder in repo. I'm getting this traceback when trying to run the flow:
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 159, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=deployment.path, local_path=".")
      File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 146, in get_directory
        shutil.copytree(from_path, local_path, dirs_exist_ok=True)
      File "/usr/local/lib/python3.10/shutil.py", line 556, in copytree
        with os.scandir(src) as itr:
    FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp2ywchethprefect/flows/hello_world'
    Have you got experience with such issues?
    ✅ 1
    c
    a
    +2
    20 replies · 5 participants
  • j

    Jessica Smith

    10/25/2022, 1:41 PM
    V1 question - I'm registering a flow that has a schedule with multiple clocks. Even if I make no changes to the flow structure/metadata, registering always increases the version number. It is something to do with the schedule + clocks, but I have set the start and end dates to fixed values and the default parameters and cron values are not changing. Am I doing something wrong? I just want to be able to register the flow and not have it create a new version if nothing has changed
    ✅ 1
    j
    4 replies · 2 participants
  • n

    Nic

    10/25/2022, 1:52 PM
    I'm running flows with docker infrastucture, but need to access files from a shared network drive inside the docker. Since i can't edit docker run command as it start with the flow run - is there any way i can mount the network drive in a way that my flows can access them inside the docker container?
    ✅ 1
    r
    4 replies · 2 participants
  • n

    Nic

    10/25/2022, 2:23 PM
    I will start new thread since my last one was marked with a tick even though it's not resolved. I will rephrase it. In the docker block, i need to use the volume argument, currently it says
    A list of volume mount strings in the format of "local_path:container_path".
    Would following bind my Local G:/ Drive to a G:/ in the docker, or should I write it differently?
    ✅ 1
    r
    8 replies · 2 participants
  • t

    Tim Enders

    10/25/2022, 2:53 PM
    🤔 Using the default setup for result caching with Prefect 2.6.4 and I always get this exception at the end of a Completed Flow run...
    Traceback (most recent call last):
      File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 359, in <module>
        flow_result = main()
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 439, in __call__
        return enter_flow_run_engine_from_flow_call(
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
        return anyio.run(begin_run)
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
        return future.result()
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 229, in create_then_begin_flow_run
        return await state.result(fetch=True)
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/states.py", line 86, in _get_state_result
        raise MissingResult(
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    It seems like Prefect is still expecting the State data to exist, even though it doesn't by default? (I may be misunderstanding things too)
    ✅ 1
    m
    10 replies · 2 participants
  • m

    Mary Clair Thompson

    10/25/2022, 3:41 PM
    Hey folks! I’d like to use python’s dataclasses and yamldataclassconfig packages to read job configurations from a yaml file. The code I have works fine outside of the context of a task, but when I wrap the code in a task I run into what looks like a fairly obscure error (in the thread). The implementation of the code is straightforward. I have a dataclass (partial version for illustration):
    @dataclass
    class Config(YamlDataClassConfig):
        EMAIL_FROM: str = None
        EMAIL_TO: List[str] = None
    as well as a task intended to instantiate the instance of the class and load data to it:
    @task(name='load-configs')
    def load_configs(config_path: str) -> Config:
        configs = Config()
        configs.load(config_path)
        return configs
    The task seems to instantiate the config object, but breaks when we try to actually load configs from the provided path. Again, this works perfectly fine outside of the context of a task, so presumably there is some internal prefect logic I’m futzing with here. I’d appreciate any thoughts!
    ✅ 1
    k
    a
    8 replies · 3 participants
  • d

    David Elliott

    10/25/2022, 3:47 PM
    Apologies if I’ve missed this somewhere obvious (quite likely!) but is it not possible to cancel a flow run from the 2.0 UI?
    ✅ 1
    k
    m
    6 replies · 3 participants
  • c

    Carlo

    10/25/2022, 4:55 PM
    If you have a flow (C) that calls two different flows (A & B). Will prefect use the deployments for the subflows or the parent? I ask because w/ ECS the deployment is what references the ECS Block that contains the cpu & memory settings. There are times where we would want each subflow to run w/ different resources
    👍 1
    ✅ 1
    m
    a
    3 replies · 3 participants
  • c

    Claire Herdeman

    10/25/2022, 8:27 PM
    Hi! We're starting to look into a transition to Prefect 2 so I have a few likely basic questions, I'm trying to test out getting a very simple from running with s3 storage and ECS infrastructure. I've gotten to the point where the task successfully launches, but I'm running into a couple issues I'll detail in the comments.
    ✅ 1
    a
    m
    11 replies · 3 participants
  • d

    David Stern

    10/26/2022, 4:40 AM
    Hi all! I'm wondering if Prefect supports dynamic task definition & dependency assignment, like in the linked AirFlow code? https://github.com/konosp/dbt-on-airflow/blob/8de43f2fef38b8e3f34c86aba77ed3210379c527/airflow/dags/schedule_dbt.py#L73
    a
    1 reply · 2 participants
  • d

    David Huang

    10/26/2022, 7:09 AM
    hi team, im running a prefect flow which sends http requests to my server. when the requests last for 10-20 minutes, prefect cannot receive the response, I can see from my server log that a 200 is already returned, anyone has any idea?
    r
    1 reply · 2 participants
  • l

    link89

    10/26/2022, 7:27 AM
    prefect always fork a new thread to execute task, is there a way to run task in process instead of thread? As there are some library that handle
    signal
    by themselves, and due to this limitation
    When threads are enabled, this function can only be called from the main thread of the main interpreter; attempting to call it from other threads will cause a
    ValueError
    exception to be raised.
    https://docs.python.org/3/library/signal.html#signal.signal It will fail when execute as task of prefect.
  • k

    Khyaati Jindal

    10/26/2022, 8:11 AM
    I am observing this behaviour with my prefect agent I am running a agent from EC2 cloud instance ( Ubuntu ) After I leave the screen , the agent run few successfully flow runs ,but after that the agent just seems to pauses. From the prefect ui , i can see the flows goes into a 'late' state , and when i again connect to my EC2 instance and open the terminal screen running my prefect agent, the agent starts running everything imedieatly causing a hike in memory computation and unstacking all the late flows. I have observed this bheaviour of agent with multiple projects running on EC2
    r
    2 replies · 2 participants
Powered by Linen
Title
k

Khyaati Jindal

10/26/2022, 8:11 AM
I am observing this behaviour with my prefect agent I am running a agent from EC2 cloud instance ( Ubuntu ) After I leave the screen , the agent run few successfully flow runs ,but after that the agent just seems to pauses. From the prefect ui , i can see the flows goes into a 'late' state , and when i again connect to my EC2 instance and open the terminal screen running my prefect agent, the agent starts running everything imedieatly causing a hike in memory computation and unstacking all the late flows. I have observed this bheaviour of agent with multiple projects running on EC2
r

Rob Freedy

10/26/2022, 12:54 PM
How are you running your agent? Is this for 1.0 or 2.0? I have found this discourse post to be very helpful for deploying an agent on an EC2 instance: https://discourse.prefect.io/t/how-to-deploy-a-prefect-2-0-agent-to-an-ec2-instance-as-your-execution-layer/551
k

Khyaati Jindal

10/31/2022, 6:58 AM
I am using prefect 2.0, thanks I will try and check it out
View count: 1