https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Aqib Fayyaz

    11/16/2021, 8:25 PM
    Can we deploy prefect agent using kustomize?
    k
    • 2
    • 2
  • m

    Max Kolasinski

    11/16/2021, 9:07 PM
    Is there any way to programmatically access a markdown artifact? We’re using the Great Expectations integration, and it’d be nice to be able to retrieve the resulting markdown and make it accessible outside of the Prefect UI
    m
    • 2
    • 3
  • v

    Vamsi Reddy

    11/16/2021, 9:26 PM
    Hi all, we are using the s3 storage to store our flows. currently they are all under one s3 bucket. we would like to store then under the same bucket but under seperate sub directory like
    bucketname/dev
    and
    bucketname/prod
    and have our dev and prod flows stored accordingly. i am using
    storage=S3(bucket="bucketname")
    but would like to have something like
    storage=S3(bucket="bucketname/dev")
    . Any idea how to do this?
    k
    d
    • 3
    • 14
  • a

    Alexander Chen

    11/16/2021, 9:42 PM
    Hello! Coming from Airflow world, we use custom executor (agent) extensively due to the nature of our worker clouds. Is there a similar functionality in Perfect where agent is pluggable?
    k
    • 2
    • 5
  • c

    Carlo

    11/16/2021, 9:46 PM
    What is the appropriate way to create an aggregate flow, such that flow A runs when the aggregate flow kicks off, flow B runs at 9pm, and Flow C runs once A and B are complete?
    m
    k
    • 3
    • 10
  • g

    Gabriel Milan

    11/16/2021, 10:08 PM
    Hello there! I see an AWS ECS Agent and a GCP Vertex agent but, in my use case, a GCP Cloud Run agent would be the perfect fit. Is there such a thing or do you have plans for it in a near future?
    k
    m
    +2
    • 5
    • 9
  • j

    Jason Motley

    11/16/2021, 11:21 PM
    Hi, I'm trying to connect to a MySQL database and am receiving an error that there is no MySQL model. The
    mysqlclient
    Anaconda package won't run on my version of Python 3.8.8 but when I created a second conda environment on 3.7 I was told that it had a compatibility error with Prefect. Are there alternative ways to connect to a MySQL db using secrets & SSL args?. The specific error is:
    Error during execution of task: ModuleNotFoundError("No module named 'MySQLdb'")
    k
    • 2
    • 12
  • b

    Billy McMonagle

    11/17/2021, 12:09 AM
    Quick graphql API question. I have multiple projects, which may have flows with names in common. I'd like to be able to query for a flow by name, with the project name as a query parameter.
    k
    • 2
    • 3
  • s

    Sridhar

    11/17/2021, 12:38 AM
    Hi, I'm using docker image pushed on the container registry as storage. I have a lot of custom dependency py files needed to run my flow. The directory looks something like this (image below) . And my Dockerfile content is
    # specify a base image
    FROM python:3.8-slim
    # copy all folder contents to the image
    COPY . .
    # install all dependencies
    RUN apt-get update && apt-get -y install libpq-dev gcc && pip install psycopg2
    RUN pip install -r requirements.txt
    My understanding is
    COPY . .
    should copy all the files required to run the flow into the image. But I'm getting an error saying no module found (image attached). Also Here's my STORAGE and RUN_CONFIG
    STORAGE = Docker(registry_url='<http://aws_id.dkr.ecr.region.amazonaws.com/|aws_id.dkr.ecr.region.amazonaws.com/>',
                     
    image_name='name',
                     
    image_tag='tag',
                     
    dockerfile='Dockerfile')
    RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'cluster-name'},
                       
    execution_role_arn='arn:aws:iam::aws_id:role/role',
                       
    labels=['dev-modelling', 'flow-test'])
    Am I missing something?? Really appreciate the help. Thanks in advance!!
    k
    b
    • 3
    • 5
  • f

    Florian Boucault

    11/17/2021, 9:29 AM
    Hi everyone: in Orion would it be possible to call a task from within a task?
    a
    • 2
    • 2
  • a

    Asmita

    11/17/2021, 11:44 AM
    Hi everyone: We are having an issue on running a mocked flow. We have a flow and for testing the flow, we are mocking a few tasks on the flow and creating a new test flow. The test flow is mocked correctly but when running the test flow, it is using registration_settings of the original flow rather than the new test flow. Would you be able to help? @Milly gupta
    a
    k
    m
    • 4
    • 53
  • f

    Fina Silva-Santisteban

    11/17/2021, 2:17 PM
    Hi community, I have an imperative api question:
    flow.set_dependencies(
                upstream_tasks=[A],
                task=B,
                keyword_tasks=dict(item=A)
            )
    Do I need to explicitly set a Task as an
    upstream_task
    when I already have it as a
    keyword_task
    ? I’ve read that one is to set state dependencies and the other is to set data dependencies. Since a data dependency is implicitly a state dependency, having task
    B
    as keyword_task should be enough, right?
    a
    • 2
    • 3
  • a

    Andreas Tsangarides

    11/17/2021, 3:58 PM
    hey all, is there a way to run only specific tasks using tags from a flow? imagine our flow splits after a specific tag, treating two different datasets
    task_1 -> task_2 -> [task_a, task_b] # i.e. task_a and task_b are independent of each other but share the same dependencies
    if we define tasks as such:
    task_1: [a, b]
    task_2: [a, b]
    task_a: [a]
    task_b: [b]
    Is there anything like:
    prefect run --tags=a
    I am really asking for functionalities I was using with
    kedro
    here :p
    k
    • 2
    • 2
  • v

    Vamsi Reddy

    11/17/2021, 4:44 PM
    Hello all, we are currently using lambda to trigger our flows. we are also currently setting up our CI for each of these flow projects. we are using the flow id to trigger our flows through lambda. but the problem is everytime we register a new version of the flow the flow id keeps changing. we noticed the flow group id remains the same. is there a way possible that we can always trigger the latest version of our flow without having to worry about changing flow id’s.
    a
    k
    n
    • 4
    • 13
  • l

    Leon Kozlowski

    11/17/2021, 4:44 PM
    Not sure if this is the appropriate place for this question, but - is it feasible to include a service account annotation in a k8s job template for a particular flow?
    k
    • 2
    • 4
  • r

    Ryan Brideau

    11/17/2021, 5:11 PM
    Hey all, I’m looking for a way to run multiple flows with different schedules. When I try to do this and execute each flow’s
    .run()
    method in a loop, though, it only the first runs and seems to block before the others run. Is there a way around this?
    k
    • 2
    • 3
  • j

    Jacob Warwick

    11/17/2021, 5:35 PM
    Hey folks. Is there a way to have Prefect treat a file on disk that was created during a task’s execution as a Result, that can be persisted using S3Result (for example) without loading that file into memory / returning it from the task function? I am trying to see if my organization can use Prefect, but our core need is to run 3rd party programs that produce large output files that may not fit in memory. Thanks and I apologize if this is already in the docs.
    n
    k
    a
    • 4
    • 10
  • f

    Frank Oplinger

    11/17/2021, 7:02 PM
    Hello, I am currently trying to use a DaskExecutor in an ECSRun to parallelize a flow. I’m following the documentation to create a temporary cluster with a specified worker image. My flow currently looks something like this:
    def fargate_cluster(n_workers=4):
        """Start a fargate cluster using the same image as the flow run"""
        return FargateCluster(n_workers=n_workers, image=prefect.context.image)
    
    class LeoFlow(PrefectFlow):
    
        def generate_flow(self):
            with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
                ...
            flow.executor = DaskExecutor(
                cluster_class=fargate_cluster,
                cluster_kwargs={"n_workers": 4}
            )
            return flow
    In the dockerfile for the image that I’m specifying in the ECSRun, I have included the following line to install dask-cloudprovider:
    RUN pip install dask-cloudprovider[aws]
    However, when I execute the flow, I am hitting the following error:
    Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
        with self.cluster_class(**self.cluster_kwargs) as cluster:
      File "/rprefect/leo_flow.py", line 58, in fargate_cluster
        return FargateCluster(n_workers=n_workers, image=prefect.context.image)
      File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
        super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
      File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
        self.session = aiobotocore.get_session()
    AttributeError: module 'aiobotocore' has no attribute 'get_session'
    Is there a specific version of dask_cloudprovider that Prefect requires?
    k
    a
    • 3
    • 21
  • t

    Tom Tom

    11/17/2021, 7:15 PM
    Hey all, i´ve got a problem with Prefect Local Agent, if I run my flow it calls:
    Unexpected error: TypeError("write() got multiple values for argument 'self'")
    Traceback (most recent call last):
      File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
    TypeError: write() got multiple values for argument 'self'
    It fails by executing this function:
    def load_data_task(self):
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(os.getcwd())
            ... do something ...
            return self.load_data()
    Regarding to this issue (https://github.com/PrefectHQ/prefect/issues/3034), it fails because
    write()
    function also got "self" input. But i cant delete "self" input in function
    load_data_task
    because it has to load function
    load_data
    . If i run the flow on local machine without server and agent, it works.
    k
    • 2
    • 18
  • a

    Arfa

    11/17/2021, 9:12 PM
    Hi all, is there a quick way to dump Prefect logger output to a file for local storage at the very end of a flow run? Thanks!
    k
    k
    • 3
    • 6
  • m

    Martim Lobao

    11/17/2021, 9:44 PM
    hi, i just encountered a bug where prefect seems to have been running the same flow run twice in parallel — not the same flow in two parallel runs, but the same flow twice in the same run. a few of this flow’s tasks spin up an emr job, and all 3 jobs got triggered twice in the same run. this is the first time i’ve encountered this issue, but i suspect it might be related to the flow having been triggered through a flow of flows. the flow had failed midway (during the GCP outage this week), and so i had to restart the child flow and the parent flow of flows. it seems that this caused the flow run to restart twice in parallel. i’m not sure sharing logs will be of any value, but i’d be happy to share the flow run ids to maybe help get some insight into what happened.
    k
    a
    • 3
    • 9
  • a

    Abhas P

    11/17/2021, 10:50 PM
    Hi Team, I want to onboard my prefect project (currently using local agent + dask executor) to run on a K8s cluster, and i have the following queries : (I found this tutorial while looking through the old threads on GKE + dask in this slack channel) 1. Storage : In order to be able to access my flows from the K8s cluster - is it recommended to use a remote storage? a. which storage might be recommended - Docker or Bitbucket? b. In order to use the Docker storage - do I need to install the all the requirements of the flow in the "extra_dockerfile_commands" block ? 2. Dask Cluster : Is using an ephemeral Dask executor recommended ? while using a Dask cluster 3. K8s in cloud : The Prefect Docs on the prefect page mention about using K8s run config , but I want to know how to get it working on a GKE cluster ?
    k
    a
    • 3
    • 10
  • t

    Tom Shaffner

    11/17/2021, 11:23 PM
    Flow question (sorry if this is the wrong place, wasn't sure where else it fit). If I set up a flow to use git with additional storage, as per https://docs.prefect.io/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage, am I able to import other python files from that folder?
    k
    • 2
    • 16
  • j

    Jason Motley

    11/18/2021, 12:22 AM
    Anyone know how to resolve this error (s)that prevents my flow from running?
    - cloudpickle: (flow built with '1.6.0', currently running with '2.0.0')\n  - prefect: (flow built with '0.15.6', currently running with '0.15.9')\n  - python: (flow built with '3.8.8', currently running with '3.7.11')")
    k
    • 2
    • 96
  • j

    Jacob Blanco

    11/18/2021, 9:13 AM
    Cloud supports Secrets in JSON format, but how do I setup a JSON secret locally setting in
    config.toml
    ?
    a
    • 2
    • 2
  • r

    Ramtin

    11/18/2021, 9:42 AM
    Hello! I was wondering how to get the flow result location inside a task? Is this possible to do?
    a
    • 2
    • 4
  • g

    Gabriel Milan

    11/18/2021, 10:22 AM
    Hi all! Is there a way of wiping (runs, logs..) history regularly? I wouldn't like to keep data older than 7 days
    a
    k
    • 3
    • 8
  • a

    agniva

    11/18/2021, 12:07 PM
    @here: hello guys! I am having some issue building and deploying docker images using Prefect. I am using M1 chip to build the docker image, and it fails with the following error:
    failed to get destination image "xxx": image with reference xxx was found but does not match the specified platform: wanted linux/amd64, actual: linux/arm64/v8
    I have build args as: build_kwargs={"buildargs": {"PYTHON_VERSION": "3.8.12"}, "platform": ["linux/amd64"]}, can anybody please help me 🙏?
    a
    • 2
    • 11
  • n

    Nacho Rodriguez

    11/18/2021, 12:24 PM
    Hi, We are trying to get to talk to a sales representative for Prefect Cloud. We have filled the contact form in the Prefect web, but never got a response. May anybody help me?
    a
    • 2
    • 1
  • p

    Piyush Bassi

    11/18/2021, 1:06 PM
    #prefect-community Hello, can we run the prefect server on Windows containers? I am using Docker feature on Windows server 2019 not the docker desktop on windows.
    a
    • 2
    • 1
Powered by Linen
Title
p

Piyush Bassi

11/18/2021, 1:06 PM
#prefect-community Hello, can we run the prefect server on Windows containers? I am using Docker feature on Windows server 2019 not the docker desktop on windows.
a

Anna Geller

11/18/2021, 1:12 PM
There is no official base-image specifically for Windows, but given that installing Prefect on a local Windows laptop works fine, and that a local agent can run on any machine, I would assume that you could make it work. But it wouldn’t be a seamless out-of-the-box experience. If you look for a simpler and faster setup as a Windows user, I recommend starting with Prefect Cloud - you get 20000 free task runs each month (no credit card required) and there is even an Azure agent you could spin up directly from Azure marketplace
View count: 2