https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Seth Coussens

    10/10/2022, 9:47 PM
    Has anyone seen Prefect throw an error when attempting to run ASYNC tasks that crashes and says 'got Future <Future pending> attached to a different loop'?
    ✅ 1
    👀 1
    m
    • 2
    • 1
  • d

    Deepanshu Aggarwal

    10/10/2022, 10:29 PM
    has anyone found any sort of python client to run a flow deployed on prefect cloud (2.0) ?
    ✅ 1
    m
    • 2
    • 1
  • h

    Homesh Wathsalya

    10/10/2022, 11:28 PM
    Hi. Very noob question but can't get my head around it. So if we do a Docker deployment, does that mean that we need the orchestration layer running on some other server as well? Can we make that Docker deployment self-running after initiation? The use case: we have a data analytic product with several Docker containers in it and we initiate all using a Docker compose. (We install this in each individual customer premises). Need to use Prefect as our transformation layer (we currently use cron tabs) and it's good if I can make a single Docker container dedicated to Prefect so that it does periodic transformations by itself. Is this achievable with Prefect? Any tutorial will be much appreciated. Thanks p.s.: we evaluated Airflow for this but decided that it introduce too many docker containers for a simple set of tasks
    ✅ 1
    a
    m
    • 3
    • 3
  • t

    Thar Htet San

    10/11/2022, 4:49 AM
    Hello I am creating prefect work queues pipelines for training and sample prediction. I finished training pipeline queues but I don't know how to trigger action for sample prediction pipeline. Cloud you please give resources or docs for that? Or Should I add sample prediction method in training flow? I use prefect 2.0 . Thank you
    m
    • 2
    • 1
  • z

    Zac Hooper

    10/11/2022, 5:11 AM
    Is there anyway to add some formatting to a flow's description? I've tried adding line breaks but appears to remove them in the UI. I'm using a self hosted version of Orion. Example description:
    description = """
        This is an example description
        \n\n\n
        Schedule: Every 3 minutes
    """
    How it looks in the UI:
    ✅ 1
    k
    • 2
    • 2
  • v

    Vadym Dytyniak

    10/11/2022, 7:49 AM
    Hi. DaskTaskRunner is logging dashboard url after cluster is created. Is it possible to stream this log to the Cloud logs? Thanks.
    c
    • 2
    • 11
  • a

    Andreas Nigg

    10/11/2022, 8:45 AM
    Hey dear community, I've a funny little use case: 1. My flow does some data deduplication 2. Input data are stored in a cached database and marked as read when they were successfully worked on 3. In the final step, the flow checks for already existing data in an external system and I update some external systems with the deduplicated data 4. The flow uses tasks for each of the tasks at hand and the last step is run with the concurrent task runner, meaning that it spawns up to 50 tasks which execute the upsert operations concurrently 5. The flow should run once per 10 minutes (I'm aware that this is a non-ideal setup, but let's say... a lot of legacy SW being involved... 😅) So now the problem: Everything works fine for as long as the flow runs once at a time. As soon as the flow runs twice at the same time, all hell breaks loose 🔥 🔥 🔥 (or at least I potentially end up in having duplicates). A flow running twice at the same time might happen, when one of the flow runs takes longer than 10 minutes and a new scheduled run get started. Is there sort of an option to limit flow runs to one at a time and keep new runs late until all previous runs are completed/failed/crashed (done in general)? Alternatively, what are some recommendations for how to implement such a behavior on the flow side? (At the start of the flow, I could for example check for an existing, active flow and than simply not continue but end the flow before doing any harm - but this somehow feels a little hacky...) I can't really use task concurrency limits here, as in the final step (where I check for existing data and than upsert), I really need to have the concurrent tasks, otherwise the operation is too slow.
    ✅ 1
    m
    a
    • 3
    • 3
  • m

    max

    10/11/2022, 9:27 AM
    Hey! I moved to prefect 2.5.0 from prefect 2.0b11 and got an error with flow deployment. In my app architecture, I want to deploy flows to prefect server from different service with python, and when I do that, this error occurs. Can you help?
    error.txt
    ✅ 1
    k
    j
    • 3
    • 11
  • h

    Hieu Tran

    10/11/2022, 9:53 AM
    I'm trying to get all the flow runs in Running state with client.read_flow_runs() but got error when initialize the FlowRunFilterState object. I couldn't find any example to do this from discourse, does anyone know what I did wrong?
    import asyncio
    from prefect.client import get_client
    from prefect.orion.schemas.states import StateType
    from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
    
    async def main():
        client = get_client()
        state_type_filter = FlowRunFilterStateType(any_=[StateType.RUNNING])
        state_filter = FlowRunFilterState(state_type_filter)
        flow_filter = FlowRunFilter(state_filter)
        flow_runs = await client.read_flow_runs(flow_filter)
        print(flow_runs)
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    Traceback (most recent call last):
      File "get_client_context.py", line 21, in <module>
        asyncio.run(main())
      File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
        return future.result()
      File "get_client_context.py", line 9, in main
        state_filter = FlowRunFilterState(state_type_filter)
      File "pydantic/main.py", line 333, in pydantic.main.BaseModel.__init__
    TypeError: __init__() takes exactly 1 positional argument (2 given)
    ✅ 1
    • 1
    • 1
  • s

    Stephen Lloyd

    10/11/2022, 11:02 AM
    Hi. I have a Prefect 1.0 flow using ECSRun Task. It as originally registered with a memory value of 2GB and created successfully in ECS. Later, we changed the memory to another value and re-registered, but the Task Definition is not changed. Is there anything obvious that we might be missing?
    ✅ 1
    a
    • 2
    • 1
  • a

    Adam Eury

    10/11/2022, 1:45 PM
    Does anyone know if it's possible for subflow runs to inherit tags from its parent flow run? Our use case is that we're starting a flow run programmatically from a web application and we're currently tagging that flow run with application-specific metadata to make it easier navigate and find flows that are specific to a particular account, for example. The application is multi-tenant so we will eventually have several flow runs executing at the same time and not being able to easily navigate the flow runs page would be annoying.
    ✅ 1
    a
    • 2
    • 4
  • d

    Daniel Tashman

    10/11/2022, 2:23 PM
    Hi, how can I fix this problem "Sorry, modifying RRule schedules via the UI is currently unsupported; select a different schedule type above or modify your schedule in code."
    ✅ 1
    r
    • 2
    • 1
  • j

    John Mizerany

    10/11/2022, 2:30 PM
    I have had a few flows (using Prefect Cloud 1) that have gotten stuck in the middle of a run. When cancelling the flow they are also getting stuck in a “cancelling” state and have a few still cancelling for 20 minutes. We are running our agent on EC2, but this looks like memory issues for the agent?
    ✅ 1
    b
    • 2
    • 9
  • k

    Kelvin DeCosta

    10/11/2022, 3:54 PM
    Hey everyone! I managed to write the necessary Infrastructure as Code (via Pulumi) for a light-weight Prefect agent that runs as an ECS Service. I was hoping to get a nice welcome message in the logs. Unfortunately, I get the
    prefect
    cli help message over and over again. ECS console shows a list of
    STOPPED
    tasks, which I'm assuming are various attempts to start the service and keep it running.
    ✅ 2
    c
    • 2
    • 9
  • n

    Nathaniel Russell

    10/11/2022, 4:18 PM
    I have a flow running in lambda that keeps giving me this warning:
    /usr/local/lib/python3.9/site-packages/prefect/logging/handlers.py:76: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
    It then runs the flow code correctly, but after the flow is done it crashes, and gives this error:
    [in thread]
    All of my flows perform their intended code but all end with this error and say crashed. How do I fix this?
    ✅ 1
    c
    m
    a
    • 4
    • 39
  • j

    Jason Bertman

    10/11/2022, 4:40 PM
    I have k8s cluster executing some pretty large flows on Orion via RayTaskRunner. I have a remote Ray cluster deployed with the KubeRay operator and an autoscaler running on the head node. It scales in and out properly, but this morning after about 23K task runs I'm seeing this:
    Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::begin_task_run() (pid=77, ip=10.80.9.219)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1191, in orchestrate_task_run
        state = await propose_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1496, in propose_state
        raise prefect.exceptions.Abort(response.details.reason)
    prefect.exceptions.Abort: This run cannot transition to the RUNNING state from the RUNNING state.
    
    During handling of the above exception, another exception occurred:
    
    ray::begin_task_run() (pid=77, ip=10.80.9.219)
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
        return future.result()
      File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1121, in begin_task_run
        task_run.state.data._cache_data(await _retrieve_result(task_run.state))
    AttributeError: 'NoneType' object has no attribute '_cache_data'
    It seems like the engine is mistaking a task run for not running yet?
    ✅ 1
    👀 2
    m
    a
    • 3
    • 5
  • w

    Walter Cavinaw

    10/11/2022, 5:14 PM
    just a quick q: does prefect support Julia now? It's mentioned on the site but can't find any examples
    :julia: 3
    ✅ 1
    c
    • 2
    • 2
  • i

    Imran Qureshi

    10/11/2022, 7:26 PM
    question: Prefect seems to stop monitoring a pipeline after four hours. Anyway to change this?
    ✅ 1
    a
    • 2
    • 6
  • c

    Carlo

    10/11/2022, 8:28 PM
    We were considering upgrading from prefect 1.1 to 2.5. Given we rely on AWS ECS + Fargate, I'm concerned it might not be fully supported yet. It looks like 2.4 introduced ECSTask w/ a caveat that the api is fluid. Any additional color would helpful. We were only looking to update to stay relevant w/ community support, our 1.1 install has been stable.
    ✅ 1
    k
    j
    m
    • 4
    • 12
  • d

    David Cupp

    10/11/2022, 9:44 PM
    Does anyone know how to schedule something in prefect based on an rruleset? According to the docs [1] and source code [2] the
    RRuleSchedule
    only takes a "rrule string". It seems easy to convert a single rrule to a string, but as far as I can tell there is no standard implementation to convert an rrule set into an rrule string [3]. Any ideas? [1] https://docs.prefect.io/api-ref/orion/schemas/schedules/#prefect.orion.schemas.schedules.RRuleSchedule [2] https://github.com/PrefectHQ/prefect/blob/main/src/prefect/orion/schemas/schedules.py#L322 [3] https://github.com/dateutil/dateutil/issues/856
    ✅ 1
    m
    h
    a
    • 4
    • 21
  • n

    Nace Plesko

    10/11/2022, 10:47 PM
    Hi, I'm running Prefect V1 and I'm having problems with passing in parameter. I'm trying to pass in Parameter to flow and use that parameter to concat a
    command
    for
    ShellTask
    . But the problem is that I'm not using the
    Parameter
    directly and it's not showing up in the UI when I register the flow. Is there any way to get around it? Thank you in advance!
    n
    m
    • 3
    • 9
  • n

    Nico Neumann

    10/11/2022, 11:29 PM
    I use
    prefect_aws
    to upload/list/download files to s3 and also for some shared AWS Secrets. For some functionality I rely on
    boto3
    , e.g.
    boto3.client("s3", ...).generate_presigned_url(…)
    . Prefect 2.5.0 is running on EKS and the flows are deployed to S3 which requires
    s3fs
    .
    To use it in a deployment: prefect deployment […] -sb s3/dev 
    You need to install s3fs to use this block.
    https://docs.prefect.io/concepts/filesystems/ My problem is that
    prefect_aws
    and
    s3fs
    have dependency conflicts. I am using pip-tools to set my requirements and get the following error:
    # simplified <http://requirements.in|requirements.in> (removed the package versions to might easier find matches)
    prefect
    prefect_aws
    s3fs
    $ pip-compile <http://requirements.in|requirements.in>
    Could not find a version that matches botocore<1.27.60,<1.28.0,>=1.27.53,>=1.27.59,>=1.27.89 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
    Tried: 0.4.1, 0.4.2, 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.6.0, 0.7.0, 0.8.0, 0.8.1, 0.8.2, 0.8.3, 0.9.0 ... [lists all versions here] 
    1.27.87, 1.27.88, 1.27.88, 1.27.89, 1.27.89
    Skipped pre-versions: 1.0.0a1, 1.0.0a2, 1.0.0a3, 1.0.0b1, 1.0.0b2, 1.0.0b3, 1.0.0rc1, 1.0.0rc1
    There are incompatible versions in the resolved dependencies:
      botocore<1.28.0,>=1.27.89 (from boto3==1.24.89->prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
      botocore>=1.27.53 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
      botocore<1.27.60,>=1.27.59 (from aiobotocore==2.4.0->s3fs==2022.8.2->-r <http://requirements.in|requirements.in> (line 3))
    I have found this issue: https://github.com/fsspec/s3fs/issues/615#issuecomment-1094791081 but not a real solution to fix it. How can I use
    prefect_aws
    and also deploy flows to S3? Does anyone else have the same problem and found a solution?
    ✅ 1
    a
    a
    • 3
    • 8
  • a

    Adam Green

    10/12/2022, 12:50 AM
    Is it possible to statically type the code used to deploy Prefect flows without converting it all to async? We have a script we are running to deploy flows
    from prefect.deployments import Deployment
    from prefect.filesystems import S3
    s3_block = S3(
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret,
        bucket_path=context["prefect_flows_bucket"],
    )
    s3_block.save("s3", overwrite=True)
    
    Deployment.build_from_flow(
        name="alpha",
        work_queue_name="alpha",
        flow=healthcheck,
        storage=S3.load("s3"),
        infrastructure=Process(),
        apply=True,
    )
    When we run mypy on this code, it complains about things not being typed as async. Is it possible to type this code without converting to async?
    ✅ 1
    a
    r
    • 3
    • 4
  • n

    Nace Plesko

    10/12/2022, 1:14 AM
    I'm trying to kick off a flow from within a flow in Prefect v1 and I can't figure out why it's not working. At first I had the flows in separate files and I thought that was the issue, then I combined them in the same file, trying
    StartFlowRun
    and
    create_flow_run
    , like it's in the docs and for some reason I'm running in a bunch of errors when executing the flow, but none when registering it. Right now I'm getting
    Failed to load and execute flow run: ValueError('No flows found in file.')
    and previously I was getting a bunch of
    Failed to load and execute flow run: KeyError("'__name__' not in globals")`
    I feel like I am missing something extremely obvious about executing a flow from within a flow that it's not even documented?
    r
    • 2
    • 1
  • n

    Nace Plesko

    10/12/2022, 2:35 AM
    Is there a way to put a tag on a Flow in Prefect V1?
    ✅ 1
    r
    • 2
    • 4
  • s

    Steph Clacksman

    10/12/2022, 7:21 AM
    I am having trouble passing a dataframe from a flow to a subflow - it throws a type error. I have made a minimal reproducible example:
    import pandas as pd
    from prefect import flow
    
    
    @flow
    def test_flow() -> None:
        df = pd.DataFrame({"ID": ["123456789", "223456789"]})
        test_subflow(df)
    
    
    @flow(validate_parameters=False)
    def test_subflow(df: pd.DataFrame) -> None:
        print(df)
    
    
    if __name__ == "__main__":
        test_flow()
    ✅ 1
    r
    • 2
    • 2
  • n

    Nic

    10/12/2022, 8:12 AM
    I've followed the helm chart setup, and can't access the ORION Server from the host-machine. However, it has following error, and i can't create any blocks - Error in reploy thread
    ✅ 1
    b
    • 2
    • 4
  • c

    Chern Hong Poh

    10/12/2022, 9:55 AM
    Hello guys, currently I am using prefect version 0.13.17 and working environment is Amazon Linux. Then I face one problem when using
    ShellTask
    that returns
    Command failed with exit code 2
    when I registered and quick run the prefect flow. I registered the flow using this command
    prefect register flow --file testing.py --project staging
    . Appreciated if someone can help. This has been bugging me since morning.
    ## print2.py
    
    print("hello")
    ## testing.py
    
    import os
    import datetime
    from datetime import timedelta
    import pendulum
    
    import prefect
    from prefect import case
    from prefect import Flow
    from prefect import Parameter
    from prefect import task
    from prefect.environments.storage import S3
    from prefect.schedules import filters
    from prefect.schedules.clocks import IntervalClock
    from prefect.schedules.schedules import Schedule
    from prefect.tasks.control_flow import merge
    from prefect.tasks.dbt import DbtShellTask
    from prefect.tasks.shell import ShellTask
    
    import subprocess
    
    @task(name="Logging")
    def logging_result(stuff):
        logger = prefect.context.get("logger")
        return <http://logger.info|logger.info>(stuff)
    
    @task(name="Run Python Script", log_stdout=True)
    def run_script():
        return ShellTask(command=f"python3 print2.py").run()
    
    with Flow(name="DBT Python daily run") as flow:
        python_run = run_script()
        final = logging_result(python_run)
    
    #flow_state = flow.run()
    #shell_output = flow_state.result[python_run].result
    #print(shell_output)
    ✅ 1
    m
    • 2
    • 1
  • r

    Robert Hales

    10/12/2022, 11:25 AM
    Hi there, seen some undesirable behaviour around 500 handling in flows. A request to create a flow run during a subflow run failed due to a 500 from the (self-hosted) prefect server. The flow in question is still marked as running, which is not the case. Interestingly, this subflows parent is also a subflow which is still marked as running - however the "root" flow is marked as crashed.
    ✅ 1
    b
    j
    j
    • 4
    • 8
  • t

    Todd de Quincey

    10/12/2022, 11:31 AM
    Hi all, I’m curious if the Prefect team have any plans to introduce concepts similar to Dagster’s Software-defined-assets or Airflow’s Data-aware scheduling? I haven’t used either of the above, but conceptually, they are very attractive solutions (especially Dagster’s implementation).
    ✅ 1
    ➕ 1
    👀 1
    b
    a
    • 3
    • 5
Powered by Linen
Title
t

Todd de Quincey

10/12/2022, 11:31 AM
Hi all, I’m curious if the Prefect team have any plans to introduce concepts similar to Dagster’s Software-defined-assets or Airflow’s Data-aware scheduling? I haven’t used either of the above, but conceptually, they are very attractive solutions (especially Dagster’s implementation).
✅ 1
➕ 1
👀 1
b

Bianca Hoch

10/12/2022, 3:45 PM
Hi Todd! We are working on a much more explicit (yet still flexible) way to track such metadata related to your workflows using Resources and Events, which will additionally allow you to track SLAs related to your data. For instance, you'll be able to define that a given Snowflake table should always be updated by 9 AM, and if such SLA is not satisfied, you can take automated actions and track related failures in a more robust way, allowing for faster detection and resolution.
💯 1
t

Todd de Quincey

10/12/2022, 3:47 PM
Any rough ETAs?
b

Bianca Hoch

10/12/2022, 5:59 PM
No firm ETAs yet, but these are high priority features that are actively being worked on for release in the near future. Definitely monitor our posts in the #announcements channel, where we share release notes for each new version of Prefect.
t

Todd de Quincey

10/14/2022, 7:03 AM
Thanks, @Bianca Hoch . Is there a project or any key tickets on GitHub that I can follow?
a

Anna Geller

10/14/2022, 10:03 AM
the #announcements channel is the easiest to follow, but if you would like to open a GitHub issue (without mentioning other tools, just focusing on what problems you want to solve, and what specific use cases you want to cover), that would be great
View count: 3