https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    Evan Curtin

    10/19/2022, 2:57 PM
    Does anybody have a best practice for storing large data as a
    Result
    in 2.6+ ? I am thinking of passing data between tasks, persisted into remote storage as parquet, for example (e.g. using spark)
    ✅ 1
    j
    m
    +1
    23 replies · 4 participants
  • r

    redsquare

    10/19/2022, 3:13 PM
    Has anyone got an example of python deployments with KubernetesJob using infra_overrides - when running the job it keeps telling me "instance of JsonPatch expected", not sure what to pass in, prefect 2.6.2
    ✅ 1
    j
    b
    5 replies · 3 participants
  • k

    Kelvin DeCosta

    10/19/2022, 4:16 PM
    Hey guys! I'm programmatically creating all my deployments and am wondering what's a nice way to create them idempotently. Meaning, I run the
    create_deployments
    script multiple times and overwrite the deployments (similar to blocks). I'm currently using the
    .build_from_flow
    and
    .apply
    methods but I keep running into warnings
    ✅ 1
    j
    2 replies · 2 participants
  • r

    Rahul Kadam

    10/19/2022, 4:33 PM
    Hi Team, is there a good documentation regarding Installation of Prefect V2 on AWS ECS ?
    ✅ 1
    k
    2 replies · 2 participants
  • c

    Claire Herdeman

    10/19/2022, 4:42 PM
    Hi, probably straightforward question on Prefect 1.x, how do you force a flow to fail if a task fails?
    ✅ 1
    m
    5 replies · 2 participants
  • d

    David Beck

    10/19/2022, 5:10 PM
    Hi team! I'm building out a deployment flow for our CI/CD purposes for k8s and was looking to use the the
    KubernetesJob.job_from_file
    function to read a yaml file with our full k8s manifest. The function works, however I noticed that we have fields that get overwritten with default values in this call for
    _shortcut_customizations
    in
    build_job
    , specifically the
    namespace
    and
    image
    properties. I know that I can use an
    infrastructure_override
    when setting up the deployment, however there should be option to simply read a yaml file with those fields.
    m
    c
    26 replies · 3 participants
  • e

    Erik Amundson

    10/19/2022, 5:11 PM
    Hi, I'm having some issues with the async python Orion client and what looks to be a cloudpickle error. Code/screenshots in thread.
    ✅ 1
    j
    5 replies · 2 participants
  • j

    Jehan Abduljabbar

    10/19/2022, 5:22 PM
    (prefect 2.0, deployment on cloud) I am running into a situation where all tasks are completed successfully but the flow is still running, what could be causing this?
    k
    7 replies · 2 participants
  • m

    Matt Denno

    10/19/2022, 6:53 PM
    Hey All - In the process of upgrading to prefect 2, so far really loving it. Having a few issues though. I am trying to build a custom image to run our flows. Starting with
    prefecthq/prefect:2.6.1-python3.10-conda
    as a base image. Couple of questions: • When installing a python environment in the container does it mater what it is called? I see the image has a
    base
    env and a
    prefect
    env. Should packages be installed in
    prefect
    env of can I create a new one? • If I create a new env should the Dockerfile include a line to activate the correct env.? • Is there a recipe for creating a custom image from a conda base image? I looked around and did find one, but maybe I missed it?
    ✅ 1
    m
    10 replies · 2 participants
  • m

    Matt Denno

    10/19/2022, 7:05 PM
    One more question: we are trying to run flows in a docker container locally for testing. I see in the docs that: • You must configure remote Storage. Local storage is not supported for Docker. Does this mean that we need to setup remote storage to test running the flows in the container? I currently have something like the following:
    deploy_import_1 = Deployment.build_from_flow(
        flow=import_1,
        name="import1",
        work_queue_name="imports",
        schedule=IntervalSchedule(
            interval=timedelta(hours=1),
            anchor_date=datetime(2021, 1, 1, 2, 30, tzinfo=pytz.UTC)
        ),
        infrastructure=DockerContainer(
            image="path_to_image/import-prefect:python3.10",
            image_pull_policy="NEVER",
            auto_remove=False
        )
    )
    Do I need to include remote storage to test? I am a bit unclear on all the pieces of the puzzle still.
    k
    4 replies · 2 participants
  • k

    Kalise Richmond

    10/19/2022, 7:06 PM
    Wanted to highlight the #show-us-what-you-got channel with our revamped Prefect Recipes by @Serina and a KV/Store built by @Michael Adkins 🎉
    🎉 3
    :party-parrot: 3
    🙌 6
    :blob-attention-gif: 2
    :catjam: 1
    ✅ 1
  • m

    Michał Augoff

    10/19/2022, 7:43 PM
    Hi all, what is the recommended way to deploy flows that come pre-built in 3rd party packages? Think e.g.
    scikit-learn
    providing a flow to train a machine learning model and you can deploy it with your own model class like
    Deployment.build_from_flow(flow=sklearn.flows.training, …, parameters={"model": SomePythonClass})
    Is that even feasible? I guess another way to look at it is “prefect collection but with flows, not tasks”
    ✅ 1
    a
    8 replies · 2 participants
  • t

    Trevor Kramer

    10/19/2022, 7:51 PM
    Can anyone explain why this never completes? I've also tried it with a dask runner and it also hangs. This is with Prefect 2
    from prefect import flow, task
    
    
    @task
    def task1(x: int) -> int:
        return x + 10
    
    
    @task
    def task2(x: int) -> int:
        return -x
    
    
    @flow()
    def run_my_flow(n: int):
        task2.map(task1.map(range(n)))
    
    
    if __name__ == "__main__":
        n = 500
        print(run_my_flow(n))
    👀 1
    ✅ 1
    j
    a
    5 replies · 3 participants
  • d

    David Beck

    10/19/2022, 8:13 PM
    Hi again, I had a quick question regarding saving blocks with
    is_anonymous
    set to
    True
    . Is it important to also set the flag for
    overwrite=True
    if the anonymous is one that is saved numerous times ala a CI/CD process?
    j
    2 replies · 2 participants
  • a

    Atul Vig

    10/19/2022, 8:44 PM
    new here, is there a link to where i can read about SSO connections? I am trying to connect Prefect with our Okta instance.
    ✅ 1
    b
    6 replies · 2 participants
  • d

    David Beck

    10/19/2022, 9:39 PM
    Hi once again, I wanted to see if there is a pythonic way to configure the execution environment in our CI/CD process to submit to Prefect Cloud than what is described here: https://docs.prefect.io/ui/cloud-getting-started/#configure-execution-environment I know I can use os/subprocess calls to run the commands described here, but it would be preferred to use some conventions that might already exists. I'm searching around and haven't found anything yet, so it'd be great to get pointed in the right direction
    ✅ 1
    r
    7 replies · 2 participants
  • m

    Michał

    10/19/2022, 10:40 PM
    Hello Everyone 🙂 I'm happy user of 1.0 and wanted to start migration to 2.0. I deployed locally orion server and started agent inside docker container. When I run flow from inside of container then I have success and flow run normally. But when I wan't to schedule or just run it from UI flow it's late, and nothing shows on logs in container. Addition info i successfully deployed and applied flow (prefect deployment apply my_favorite_function-deployment.yaml Successfully loaded 'log-simple' Deployment 'my-favorite-function/log-simple' successfully created with id 'f44fb717-a757-4603-aca8-df122b5fbf54'. View Deployment in UI: http://192.168.1.24:4200/deployment/f44fb717-a757-4603-aca8-df122b5fbf54 To execute flow runs from this deployment, start an agent that pulls work from the 'test' work queue: $ prefect agent start -q 'test') Starting v2.6.3 agent connected to http://192.168.1.24:4200/api... _ _ _ _ _ _ _____ _ _ _ _ _ _____ | _ \ _ \ __| __| __/ _| _| /_\ / __| _| \| | _| | _/ / _|| _|| | (_ | | / _ \ (_ | _|| .` | | | |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_| Agent started! Looking for work from queue(s): test...
    m
    2 replies · 2 participants
  • j

    John Ramey

    10/20/2022, 1:19 AM
    I’m migrating a project from Prefect 1 to Prefect 2. I’m getting
    TypeError: task() got an unexpected keyword argument 'nout'
    — is the
    nout
    concept longer required?
    ✅ 1
    m
    2 replies · 2 participants
  • Migrating from the DockerRun in Prefect 1 to DockerContainer in Prefect 2
    t

    Thomas Pedersen

    10/20/2022, 7:27 AM
    Trying to migrate to Prefect 2 using the flows defined in Docker infrastructure images (https://www.prefect.io/guide/blog/prefect-2-3-0-adds-support-for-flows-defined-in-docker-images-and-github/). In Prefect 1, we could tie the flows in each release to a specific Docker image version by setting the version in DockerRun(image="{image}:{version}"). That way we were certain that the flow were always running in the intended image. In Prefect 2: 1. Not setting the version in the infrastructure block, makes each flow use the 'latest' version. As far as I understand this means that a flow could potentially run on an older version if the images haven't been updated on the agent. In our old setup this would have cause a flow failure - which would be far better. 2. Setting the image version explicitly in the infrastructure block would mean that I have to create a new infrastructure block for every release we do. Am I missing any smarter ways to control versioning of flows and infrastructure?
    ✅ 1
    a
    7 replies · 2 participants
  • v

    Vadym Dytyniak

    10/20/2022, 8:16 AM
    Hi. I see that the issue I raised some time ago was fixed, but I still see the same behaviour in the Cloud. Is it fix only for scenario when you are running own backend?
    ✅ 1
    j
    4 replies · 2 participants
  • w

    wonsun

    10/20/2022, 9:41 AM
    Hello experts~~ I'm using prefect 1.0! I say thank you so much for always answering me quickly before asking a question.💙 When the flow, which was well executed for small-scale data, tried to receive a large amount of data from the our web server as parameter, the following client error was encountered. I downloaded the log of the flow from prefect cloud UI and attached it.(logs.json) and wirte here, too.
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/client/client.py", line 1604, in set_task_run_state
        result = self.graphql(
      File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/client/client.py", line 464, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    Actually, it wasn't the first time that task failed, and before that, it ran for about 3 minutes and then informed me that no heartbeat detected. (
    No heartbeat detected from the remote task; marking the run as failed.
    ) So, the solution I found was configure heartbeats to use threads instead of processes and worte about the flow run config in the .py file. When I did that, the task of receiving parameters was performed longer than the first... (1st try : 3 minutes running -> 2nd try: 12 minutes running) Although the task was executed for a longer time, it was still a failure. 😞 How can i solve this problem? What's the problem of my engineering? This flow may have been written in the wrong way, so I also wrote the flow code below..
    import...
    from prefect.run_configs import UniversalRun
    
    def custom_function():
        '''some works'''
        return output 
    
    @task
    def parsing_waveforms(download):
        processing_target = download    
        '''some works by using above custom_function'''
        
    with Flow('flow_waveforms')as flow:
        heir = Parameter('download')
        task1 = parsing_waveforms(download=heir)
    
    flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE'}:'thread')
    flow.register(project_name='data_factory')
    logs.json
    ✅ 1
    m
    4 replies · 2 participants
  • s

    Sudharshan B

    10/20/2022, 11:23 AM
    I'm trying to post a message logs in slack after the flow gets to success. But it shows None as the message for some reason. I have used the same function for failure and it works fine on it. Is there any function for getting the set of data on slack for successfully completed prefect flows?
    b
    2 replies · 2 participants
  • k

    Klemen Strojan

    10/20/2022, 12:02 PM
    Hey all! I am using the latest Prefect version (2.6.3). The agent is deployed on AKS in the namespace called
    dev-prefect-2
    . Which makes no sense - why would I need privileges in the
    default
    namespace? I can run this with Prefect 2.3 without issues.
    c
    4 replies · 2 participants
  • l

    Lennert Van de Velde

    10/20/2022, 12:06 PM
    Hi all, when doing a flow run with Concurrent flow runner, my run always ends af 204 tasks with a timeouterror. I have tried setting the PREFECT_ORION_DATABASE_TIMEOUT to 10, but it results in the same error. Database being used is SQlite.
    c
    5 replies · 2 participants
  • t

    Trevor Kramer

    10/20/2022, 1:42 PM
    When defining an ECSTask if the provided task definition arn has a container named prefect (as prefect expects) the system will override the image in the task definition with the base prefect 2.0 one unless an image parameter is also supplier to the ECSTask. This is counterintuitive.
    a
    2 replies · 2 participants
  • j

    Jessica Smith

    10/20/2022, 3:14 PM
    In Prefect v1, is there a way to set a label on a schedule through the UI? I see that the documentation mentions labels on clocks, the clock schemas in the serialization modules have label fields, but the UI and the GraphQL endpoint do not seem to recognize that clocks have labels. Am I missing something?
    n
    2 replies · 2 participants
  • a

    Ashoka Sangapallar

    10/20/2022, 4:28 PM
    Hi All, can you please let me know where can i get the API url for prefect2 installation to run a test job remotely from my machine using curl
    ✅ 1
  • a

    Ashoka Sangapallar

    10/20/2022, 4:28 PM
    in Prefect 2
  • a

    Ashoka Sangapallar

    10/20/2022, 4:34 PM
    never mind...i got it
    :marvin: 1
    :thank-you: 1
    ✅ 1
    💯 1
  • Do I need a custom ECR image for my agent or can my ECS task running agent be based on the base Prefect image?
    k

    Kelvin DeCosta

    10/20/2022, 4:42 PM
    Hey guys! I have a Prefect Agent running as an ECS Service. It is uses a lightweight container and will be invoking deployments via
    ECSTask
    . My question: Do I need to install dependencies like
    prefect-aws
    ,
    s3fs
    etc. and register the blocks (eg:
    prefect block register -m prefect_aws.ecs
    ) in the container for the agent? Or can I just use the latest
    prefecthq/prefect
    image?
    👍 1
    ✅ 1
    a
    6 replies · 2 participants
Powered by Linen
Do I need a custom ECR image for my agent or can my ECS task running agent be based on the base Prefect image?
k

Kelvin DeCosta

10/20/2022, 4:42 PM
Hey guys! I have a Prefect Agent running as an ECS Service. It is uses a lightweight container and will be invoking deployments via
ECSTask
. My question: Do I need to install dependencies like
prefect-aws
,
s3fs
etc. and register the blocks (eg:
prefect block register -m prefect_aws.ecs
) in the container for the agent? Or can I just use the latest
prefecthq/prefect
image?
👍 1
✅ 1
I'm currently using this Dockerfile:
FROM prefecthq/prefect:2-python3.10

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

# Register AWS block types
RUN prefect block register -m prefect_aws.ecs
But I think this might be unused / redundant
a

Anna Geller

10/20/2022, 4:43 PM
Your intuition is right, you can use the latest
prefecthq/prefect
image for your agent
✅ 1
the ECSTask block has all attributes needed, agent can be super lightweight and only needs prefect as a dependency we have this repo template with a blog post and video tutorial linked in the readme https://github.com/anna-geller/dataflow-ops
k

Kelvin DeCosta

10/20/2022, 4:47 PM
Thank you! And yes, this repo has been instrumental to our approach! I just ran my first prefect deployment as an ECS Task yesterday! Now, I'm setting up some workflows to keep things updated
:thank-you: 2
👍 3
a

Anna Geller

10/20/2022, 4:52 PM
awesome, so great to hear and keep us posted how it goes! 🙌
k

Kelvin DeCosta

10/25/2022, 5:26 PM
Hey Just wanted to update that you do infact need to install
prefect-aws
if you plan to use
ECSTask
. I ran into a
KeyError: ecs-task
could not be dispatched Also, I recently switched from
S3
to private
GitHub
and I had to install
git
on the
prefect
container's image in my
ECSTask
definition
View count: 7