https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Daniel Sääf

    08/23/2022, 9:00 AM
    Hi! After playing around a bit with the prefect 2-beta i am now trying to get the public version up and running - but running into issues with the new deployment. My scenario is that i would like to build the deployments locally and have a VM (GCE) running as my agent. The problem is that when i am building the deployment file. my local path is hard-coded as a path - which results in an error when the agent tries to fetch the flow. Is there a way to solve this? Or is this a complete abuse of how it’s supposed to work?
    ✅ 1
    o
    • 2
    • 9
  • t

    Tejal Singh

    08/23/2022, 9:51 AM
    Hi, I am just wondering what are all the possible options prefect provides us for executing spark/pyspark jobs. Like, in kubeflow pipelines we can use spark-operator to do this. In our use case we want to do everything on K8 clusters so using prefect how can we do that. Using this example, we can only run spark in local mode. We want to run spark/pyspark jobs on our K8 cluster with proper scaling and all. cc @Kevin Kho @Anna Geller
    a
    • 2
    • 4
  • o

    Owen Cook

    08/23/2022, 11:02 AM
    Hi, I was just wondering whether anyone knows if you can pause a deployment with python/CLI, as there is the option to pause a deployment in the UI. Cheers :)
    a
    • 2
    • 1
  • s

    Saman

    08/23/2022, 11:32 AM
    I’m trying to build an AzDo pipeline for our Prefect code. To login to the Prefect cloud, the command
    prefect cloud login -k *** -w myaccount/test
    will ask for a profile name. Is there a way to pass the profile name in the parameters?
    a
    • 2
    • 3
  • k

    Karan

    08/23/2022, 12:08 PM
    Do I need to download prefect.schedules module separately? If yes, I'm unable to install it. I'm getting below error. ERROR : Could not find a version that satisfies the recurretient prefect schedules ( VE DE ERROR: No matching distribution Fourd for prefect, scandules
    k
    • 2
    • 2
  • m

    Maria

    08/23/2022, 12:31 PM
    Hi, i tried to install orion server on eks using helm following steps here. For some reason it only creates prefect orion api and postgresql pods and the api pod keeps crashing and never manages to get up. For prefect server 1.0, if i remember correctly, 5-6 pods were created. Did i miss something here?
    k
    a
    • 3
    • 3
  • t

    Tom Klein

    08/23/2022, 2:11 PM
    Question about Prefect 1.0 memory usage: there seems to be some kind of memory leak even though each task is run as an isolated Shell task that completely exits once it’s done ----- what could be the cause of this?
    • 1
    • 4
  • r

    Robert Esteves

    08/23/2022, 2:27 PM
    Does Prefect 2.X version supports prefect agent docker start?
    k
    • 2
    • 1
  • i

    Ilya Galperin

    08/23/2022, 2:40 PM
    Do we need to authenticate with prefect cloud via
    prefect cloud login -k $PREFECT_API_KEY -w $PREFECT_WORKSPACE
    in order to run a
    block.save
    Python method? If we try authenticating through simply setting our default profile and running
    prefect config set PREFECT_API_KEY/PREFECT_API_URL
    but not explicitly calling prefect cloud, we get an error when trying to save a storage block (noted in replies). Is there a recommended pattern to use for CI/CD to a save a block otherwise?
    k
    t
    • 3
    • 24
  • n

    Nikhil Jain

    08/23/2022, 5:22 PM
    Prefect 1.0: que about CronScheduler: is it possible to modify Parameter defaults for runs that are started via CronScheduler? the use case is that manual runs are assumed to be for testing and we want to skip a certain task by default. But tasks run via scheduler should not skip that task.
    ✅ 1
    j
    • 2
    • 1
  • t

    Tejal Singh

    08/23/2022, 6:33 PM
    When I tried to run the below code I am getting this error:
    RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
    from prefect import task, flow
    import contextlib
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    
    @contextlib.contextmanager
    def SparkCluster(conf2: SparkConf = SparkConf()):
        ssobj = SparkSession.builder.config(conf=conf2).getOrCreate()
        try:
            yield ssobj
        finally:
            ssobj.stop()
    
    @task
    def get_data(spark: SparkSession):
        return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
    
    @task()
    def analyze(df):
        word_count = df.groupBy('word').count()
        word_count.show()
    
    
    @flow(name="spark_flow")
    def my_flow():
        conf = SparkConf().setMaster('local[*]')
        with SparkCluster(conf) as spark:
            df = get_data(spark)
            analyze(df)
    
    if __name__ == '__main__':
        my_flow()
    Basically I was trying to convert the below code from prefect 1.0 to 2.0:
    from prefect import task, Flow, resource_manager
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    
    @resource_manager
    class SparkCluster:
        def __init__(self, conf: SparkConf = SparkConf()):
            self.conf = conf
    
        def setup(self) -> SparkSession:
            return SparkSession.builder.config(conf=self.conf).getOrCreate()
    
        def cleanup(self, spark: SparkSession):
            spark.stop()
    
    @task
    def get_data(spark: SparkSession):
        return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
    
    @task(log_stdout=True)
    def analyze(df):
        word_count = df.groupBy('word').count()
        word_count.show()
    
    
    with Flow("spark_flow") as flow:
        conf = SparkConf().setMaster('local[*]')
        with SparkCluster(conf) as spark:
            df = get_data(spark)
            analyze(df)
    
    if __name__ == '__main__':
        flow.run()
    ✅ 1
    k
    c
    • 3
    • 4
  • i

    Ian Andres Etnyre Mercader

    08/23/2022, 9:45 PM
    Hi guys, while migrating to prefect orion I started using
    ContextModel
    . I'm running into issues with the asynchronous section of my code when using
    task_runner=DaskTaskRunner
    the context are not defined in the tasks.
    @flow(name="Biorxiv", task_runner=DaskTaskRunner())
    def biorxiv_main_flow(block_name):
        with MyContextModel(block_name):
            prepare_destination = biorxiv.prepare_destination()
            items = get_docs()
            items_futures = []
            with tags("converters"):
                for item in items:
                    items_futures.append(biorxiv.convert_xml_to_json.submit(item))
                item_results = [i_futures.result() for i_futures in items_futures]
    AttributeError: 'NoneType' object has no attribute 'data_fs'
    I get this error on the line of code
    MyContextModel.get().data_fs
    get() returns None Is there something I need to config for the context to be defined in the dask tasks?
    m
    • 2
    • 7
  • p

    Prakash Rai

    08/24/2022, 1:57 AM
    Hi there, I'm having difficulty with managing tasks and killing the tasks that are already started Here's some background • I have one
    flow
    with two `task`s ◦ The first
    task
    downloads a CSV ◦ The second
    task
    downloads a PDF for every row in the CSV. ▪︎ It takes around 10s to download a PDF, and there are around 500 PDFs to be downloaded ▪︎ Each task is named after the PDF it is downloading (I'm using
    with_options
    to assign task names on the fly) ▪︎ I've added a concurrency-limit of 8 on this task. Now, when my flow completes, I still see some 6-7 tasks in
    Running
    state on the UI. However, the corresponding PDFs are downloaded and saved on my disk. I have three questions • Why is this happening? The fact that PDFs are downloaded means that the tasks are completed. Is prefect somehow failing to detect that the job ended? • I'm using
    prefect concurrency-limit inspect 'pdf-downloader'
    to look for the running tasks. I am able to extract task-ids, but can't find a documented way of killing them. Is there a command which takes task ID and kills it? If not, what is the preferred way of killing • Is there a way to specify maximum time limit for a task? Thanks in advance 🙂
    ✅ 1
    👀 1
    e
    • 2
    • 9
  • a

    Ahmed Ezzat

    08/24/2022, 3:59 AM
    I can't find any references in 2.0 docs mentioning
    result_filesystem
    it's responsible for handling task result storage. I'm trying to store task result on s3 bucket
    ✅ 1
    c
    • 2
    • 3
  • m

    Malavika S Menon

    08/24/2022, 5:42 AM
    Is there a way to find how many times an agent executed a specific flow run?
    ✅ 2
    e
    • 2
    • 3
  • j

    José Duarte

    08/24/2022, 7:55 AM
    Is there a particular reason why the prefect codebase uses the type pattern
    value: NonNullableType = None
    ? I understand the motivation behind it but in the end, the type is wrong and mypy will complain. Is there a setting for mypy I am missing?
    ✅ 1
    j
    • 2
    • 1
  • s

    Stephen Lloyd

    08/24/2022, 8:46 AM
    Is there a way to check usage via an api call?
    j
    • 2
    • 1
  • k

    Karan

    08/24/2022, 10:24 AM
    Would scheduled job via Prefect local agent run even when machine is shut down?
    ✅ 1
    c
    • 2
    • 1
  • t

    Tejal Singh

    08/24/2022, 11:34 AM
    I want to convert the below code written in prefect 1 to prefect 2:
    from prefect import task, Flow, resource_manager
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    
    @resource_manager
    class SparkCluster:
        def __init__(self, conf: SparkConf = SparkConf()):
            self.conf = conf
    
        def setup(self) -> SparkSession:
            return SparkSession.builder.config(conf=self.conf).getOrCreate()
    
        def cleanup(self, spark: SparkSession):
            spark.stop()
    
    @task
    def get_data(spark: SparkSession):
        return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
    
    @task(log_stdout=True)
    def analyze(df):
        word_count = df.groupBy('word').count()
        word_count.show()
    
    
    with Flow("spark_flow") as flow:
        conf = SparkConf().setMaster('local[*]')
        with SparkCluster(conf) as spark:
            df = get_data(spark)
            analyze(df)
    
    if __name__ == '__main__':
        flow.run()
    I have converted a few obvious things but still I dont have an idea how to remove this resource_manager decorator and what context manager to use in that place. Now my code looks like this:
    from prefect import task, flow, resource_manager
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    
    @resource_manager
    class SparkCluster:
        def __init__(self, conf: SparkConf = SparkConf()):
            self.conf = conf
    
        def setup(self) -> SparkSession:
            return SparkSession.builder.config(conf=self.conf).getOrCreate()
    
        def cleanup(self, spark: SparkSession):
            spark.stop()
    
    @task
    def get_data(spark: SparkSession):
        return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
    
    @task(log_stdout=True)
    def analyze(df):
        word_count = df.groupBy('word').count()
        word_count.show()
    
    
    @flow(name="spark_flow")
    def my_flow():
        conf = SparkConf().setMaster('local[*]')
        with SparkCluster(conf) as spark:
            df = get_data(spark)
            analyze(df)
    
    if __name__ == '__main__':
        my_flow()
    Can anyone please help me in this. cc @Anna Geller
    ✅ 1
    a
    r
    j
    • 4
    • 4
  • b

    Barys Rutman

    08/24/2022, 11:45 AM
    Hi guys, is it still relevant? Can we connect to MSSQL in Prefect 2.0?
    ✅ 1
    j
    r
    • 3
    • 3
  • v

    Vishy ganesh

    08/24/2022, 1:15 PM
    I'm just trying to understand the prefect 2.0 architecture from a scaling perspective. I'm just trying to picture it in my mind.. I understand 3 of the components needed to self host are: • Orion Server • UI • API Server All these 3 components are part of Orion. So to scale do we have multiple replicas start with API Server HA:
    --scheduler       --no-scheduler
    │ --ui              --no-ui
    UI Server HA:
    --scheduler       --no-scheduler
    Or am I thing through this in the wrong fashion
    c
    • 2
    • 4
  • d

    Daniel Sääf

    08/24/2022, 1:19 PM
    Hi! I am running into an issue where the agent is running out of diskspace. It looks like the problem is that the PREFECT_LOCAL_STORAGE_PATH is filled up with persistant Results when running the flows. Some of my tasks are returning DataFrame’s which a significant amount of data which i assume creates the issue. So therefore, i am asking here on advice on how to proceed. I have thought along the following ideas: • Are there any ways i can limit (by stored bytes, or in time) how persistent the Results should be? • Otherwise, i guess the other option is to rewrite the flow so not the full dataframe is return. (For example dump the data frame to a temporary file which can be read by the next task.) • I guess another option might be to create a clean-up task in the end of the flow that delelets the stored results. This seems to me be the ugliest solution. What would you recommend? Or is there anything that i am missing?
    j
    a
    j
    • 4
    • 6
  • t

    Tim-Oliver

    08/24/2022, 1:53 PM
    I get the following error for `prefect block ls`:
    ✅ 1
    j
    • 2
    • 2
  • j

    Jonathan Pou

    08/24/2022, 2:00 PM
    Hello! Using Prefect 1 on a local VM, I noticed that our .prefect/results folder is quite large. Would it causes any issues if I cleaned out the folder while no Flows are running?
    ✅ 1
    b
    • 2
    • 2
  • c

    Chris L.

    08/24/2022, 2:06 PM
    UPDATE (RESOLVED): Turns out it's an authentication 401 error not a 404 error as stated by
    PrefectHTTPStatusError
    . I found out that a team member recently changed a few of our k8s secrets and replaced our k8s secret PREFECT_API_KEY with an expired key. Changing the api key to an active key solved the issue. Going to open a GitHub issue as I believe the 404 error sent me on the wrong goose chase... Hello there, deployed Prefect Orion agent to kubernetes connected to Prefect Cloud. Flow runs were working yesterday but today the k8s pods (agent container) started failing with this mysterious error. I cannot seem to find anything wrong with the manifest (it's exactly the same as what we had yesterday except
    image
    is now
    prefecthq/prefect:2.2.0-python3.9
    instead of
    prefecthq/prefect:2.1.1-python3.9
    . Nevertheless, I also tested using image:
    prefecthq/prefect:2.1.1-python3.9
    , also getting the same error (see full traceback in thread).
    prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/REDACTED/workspaces/REDACTED/block_types/>'
    Response: {'detail': 'Not Found'}
    For more information check: <https://httpstatuses.com/404>
    An exception occurred.
    ✅ 1
    👍 1
    :thank-you: 1
    k
    • 2
    • 4
  • k

    Karan

    08/24/2022, 2:17 PM
    Questions! 1. Prefect Local Agent vs. Cloud, Can we use Local Agent to schedule a job series? 2. I work in a startup as part of the Data Engineering Team; can we use the Starter Version officially because that is what we need right now, or do we have to buy Prefect Growth? 3. Is Prefect GDPR compliance achieved? How can we be certain of this requirement? Assume I'm working with sensitive data that isn't saved somewhere. 4. Given that I am solely responsible for the tool's implementation, do I need to send any official emails confirming Prefect usage or something similar?
    ✅ 1
    j
    • 2
    • 2
  • c

    Claire Herdeman

    08/24/2022, 2:42 PM
    Hey! I've got a Prefect Cloud account related question, who should I ask about that?
    ✅ 1
    b
    • 2
    • 27
  • a

    Aleksander

    08/24/2022, 3:19 PM
    Hi! I am trying to trigger a flow run via API call. This guide proved to be very helpful, https://discourse.prefect.io/t/how-to-trigger-a-flow-run-from-a-deployment-via-api-call-using-python-requests-library-or-from-a-terminal-using-curl/1396. I wonder, is it possible to attach data (list or dict) to API request and make that data accessible for triggered flow-run?
    ✅ 1
    r
    • 2
    • 6
  • l

    Lucas Brum

    08/24/2022, 3:41 PM
    Hello, I'm in need of help. Are there any other similar and simpler alternatives than selenium that I can use in prefect? From what I'm seeing, selenium is going to end up giving me a lot of work to implement. I just wanted to know if there are any alternatives to selenium that I can use in prefect.
    ✅ 1
    k
    • 2
    • 5
  • s

    Sam Garvis

    08/24/2022, 4:46 PM
    Is there a replacement for
    upstream_tasks=[result])
    in 2.0? I used this in 1.0 when using the Dask Executor to force a task to wait on another task, but I believe it was taken away in 2.0
    ✅ 1
    n
    • 2
    • 2
Powered by Linen
Title
s

Sam Garvis

08/24/2022, 4:46 PM
Is there a replacement for
upstream_tasks=[result])
in 2.0? I used this in 1.0 when using the Dask Executor to force a task to wait on another task, but I believe it was taken away in 2.0
✅ 1
n

Neil Natarajan

08/24/2022, 4:49 PM
it's now
wait_for
✅ 1
subflows are blocking
View count: 3