https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    Edison A

    10/29/2020, 10:20 PM
    Pickle error when trying to register project name
    flow.register('project_name')
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1623, in register
        registered_flow = client.register(
      File "venv/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1450, in serialize
        self.storage.add_flow(self)
      File "venv/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 144, in add_flow
        flow_location = flow.save(flow_location)
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1519, in save
        cloudpickle.dump(self, f)
      File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 55, in dump
        CloudPickler(
      File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle 'weakref' object
    n
    • 2
    • 2
  • h

    Hui Zheng

    10/29/2020, 11:50 PM
    Hello, I am using GKE cluster as Prefect agent cluster to run flows. Is it possible to expose the GKE Pod Information to Flow in the Containers? Maybe using Environment Variables like this ? https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/
    c
    • 2
    • 2
  • p

    Pedro Machado

    10/30/2020, 2:36 AM
    Hi there. Is there a way to get notified by Prefect cloud if an agent goes offline?
    👍 1
    c
    • 2
    • 1
  • a

    ale

    10/30/2020, 8:41 AM
    Hey folks, are you going to record the events scheduled for today? I’m based in Italy and it would be a little late for me to attend, but I would definitely love to watch the recording 🙂
    j
    • 2
    • 1
  • s

    Sque

    10/30/2020, 11:04 AM
    Hey folks, I have just started diving into prefect and I am trying to migrate some code from luigi One of the problems I am facing is that we used the dynamic dependency system of luigi, so a task could
    yield
    other tasks as dependents. I was looking for similar functionality and the closest I found is the
    LOOP
    but the problem is that this permits only one direct dependency while I need to create multiple. I also tried to represent the problem on the flow level but unfortunately it does not work in my case. The task dependencies depend on the value of parameters so this is not known until a flow is executed. Any ideas here what is the prefect way to tackle this?
    d
    • 2
    • 21
  • j

    Joël Luijmes

    10/30/2020, 2:55 PM
    Hey, I’m evaluating both Airflow and Prefect in order to monitor and build data pipelines. As a proof of concept I want to make a sync from Postgresql -> BigQuery. With Airflow this was pretty straightforward (see attached snippet). But I’m struggeling to implement this with Prefect, any suggestions? Specifically the question is: how can I change the query based on result of previous task? Additional question: can I export all the results to jsonl format (or something similar) directory from the Postgres task? Or should I do it manually in the code? This obviously fails
    from prefect import task
    from prefect import Flow
    from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
    
    
    with Flow("PostgreSQL -> BigQuery Sync") as flow:
        max_product_id = PostgresFetch(
            'manager', 
            'postgres',
            'localhost',
            15432,
            # query='SELECT MAX(id) from products'
            query='SELECT 188000'
        )
    
        query_products = PostgresFetch(
            'manager',
            'postgres',
            'localhost',
            15432,
            fetch='all',
            # query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
        )
    
        products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')
    
    
    
    state = flow.run()
    print(state.result[max_product_id].result)
    print(state.result[query_products].result)
    print(state.result[products].result)
    psql-to-bq.py
    c
    d
    m
    • 4
    • 8
  • e

    Edison A

    10/30/2020, 5:34 PM
    What is the best way of dealing with loops inside a flow? I wrote a program which has to loop through a list of file names, scrape their xml, process each, write the results to a database. This does not work. (All functions called are tasks)
    with Flow("epex_scraping", schedule=schedule) as flow:
        """Main definition of all Data pipeline steps"""
        report_names = scrape_for_file_names()
        for report_name in report_names:
            # extract
            report_xml = get_xml_files(report_name)
            report_json = get_xml_jsons(report_xml)
            # transform
            public_trades_collection = generate_public_trades(report_json)
            # load
            write_to_public_trades_db(public_trades_collection)
    
    flow.register('project_x')
    flow.run()
    d
    • 2
    • 6
  • j

    Joseph Haaga

    10/30/2020, 6:43 PM
    I have a list of
    newspaper.Article
    objects (from
    newspaper3k
    ) that I would like to analyze w/ a Spacy model. Is
    unmapped
    an appropriate way to pass in the
    spacy
    model to the task without initializing it each time? e.g.
    @task
    def get_articles() -> List[Article]:
       ...
       return articles
    
    @task
    def load_spacy():
       return spacy.load("en_core_web_md")  # this is a slow operation
    
    @task
    def extract_organizations(article: Article, nlp) -> Set:
       return nlp(article.text).ents
    
    with Flow("Extract Orgs from News Articles"):
       articles = get_articles()
       nlp = load_spacy()
       extract_organizations.map(articles, nlp=unmapped(nlp))
    d
    • 2
    • 1
  • c

    Chris White

    10/30/2020, 7:44 PM
    🎃🎃 Hey everyone!! If you’re available in 15 minutes, please join us and @Laura Lorenz as she demonstrates that migrating spooky legacy pipelines into Prefect doesn’t have to be scary; costumes are encouraged!!

    https://www.youtube.com/watch?v=kH3hPVwFfiA&feature=youtu.be▾

    👻 👻
    :marvin-duck: 4
    👻 5
    l
    f
    • 3
    • 5
  • b

    Brian Mesick

    10/30/2020, 8:05 PM
    Hi all, another random question. In upgrading my flow from Prefect
    0.12.6
    to
    0.13.3
    the container no longer has git installed, causing pip installs from git to fail. We use a shared library that gets pip installed from git, so this makes us sad. I’m not seeing a way to inject a build step to install git, or instructions on how to add one, so I’m a bit stuck at the moment. Has anyone else run into this?
    c
    • 2
    • 2
  • j

    Joseph Haaga

    10/30/2020, 9:11 PM
    Getting this error when I try to use Docker storage
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    Does
    python_dependencies
    take care of the
    pip install
    for me? Or do I have to provide a custom Dockerfile that does the
    COPY requirements.txt
    and
    RUN pip install -r requirements.txt
    ?
    d
    m
    • 3
    • 12
  • j

    Jasono

    10/30/2020, 10:10 PM
    Is there an easy way to make two flows run one after another? (flow A becomes the dependency of flow B)
    d
    m
    • 3
    • 3
  • n

    Newskooler

    10/30/2020, 10:44 PM
    👋 Hi, I am sure that’s possible, but just don’t know how - how can I schedule a Flow to run at a specific time one via the UI? 🤔
    c
    • 2
    • 7
  • c

    Cristian Bob

    10/30/2020, 11:51 PM
    Hello! I am new into Prefect and for the last few days I've been testing its features. I feel enthusiastic about it and I really think that it can help us in defining robust and easy to understand workflows. I have one question regarding logging when running a flow using a
    DaskExecutor
    . In one of my tests I am running locally a flow using the
    DaskExecutor
    . The associated dask-scheduler as well as the dask-worker are started on my local computer. The problem is that the tasks' logged messages are being printed in the dask-worker's terminal instead of the terminal where I started the flow. Is there a way to get them printed in the latter terminal? I looked into the `DaskExecutor`'s documentation for an argument like
    return_stdout
    but I couldn't find anything relevant. Can you help me, please?
    c
    • 2
    • 2
  • b

    bral

    10/31/2020, 4:25 AM
    Hi folks! There is task concurrency limit according documentation. It works only on cloud ? Can i create tag using server backend and be sure about concurrency limit ?
    c
    • 2
    • 2
  • j

    Joël Luijmes

    10/31/2020, 4:30 PM
    Hey, I want to persist my results on GCS for a specific task. Which works if I execute the task from an agent, but it fails when I want to run the flow locally. It just doesn’t write to the bucket, nor give any logs that it is trying to access GCS.
    m
    • 2
    • 15
  • r

    Radek Tomsej

    10/31/2020, 7:47 PM
    Hi everyone, I am new to😛refect:*Prefect* so maybe the answer is trivial but I was not able to find it anywhere. Let's say I have a couple of loading flows that ingest data to DWH. After all, loading flows end I will execute another flow - transformation. Inside this flow i have couple of tasks but before every of this task is executed I need to check that all its dependencies (certain list of tasks in loading flows; not whole flows) passed. I was thinking about using GraphQL requests for this, but you could have better idea. Is there any way how to pass results/states between tasks that are in different flows?
    e
    • 2
    • 1
  • a

    Avi A

    11/01/2020, 2:02 PM
    Hey people, Any thoughts on how to deploy “staging” flows? I mean suppose you have a prefect flow registered (on either server/cloud), and you’re making changes to it. You tested your changes locally but now you’d like to test it on remote execution without erasing your production flow. What is the best practice here?
    c
    • 2
    • 4
  • m

    Mariusz Olszewski

    11/01/2020, 3:11 PM
    hi all, where should I insert my custom functions to be visible by prefect?
    a
    • 2
    • 2
  • m

    Mariusz Olszewski

    11/01/2020, 3:11 PM
  • m

    Mariusz Olszewski

    11/01/2020, 3:12 PM
    i tried to use my Functions.py module in some task with no success
  • p

    Priyan Chandrapala

    11/01/2020, 4:48 PM
    Hi All, I’m trying to create a flow with Prefect which will only be triggered on demand, it should not be triggered on a schedule. I’m new to Prefect this is my first project. I could not find this in documentation. If anyone has done this before please help me out. Thanks in advance.
    a
    • 2
    • 5
  • j

    Jasono

    11/02/2020, 3:17 AM
    Hi I’m trying to define tasks imperatively based on a list. The challenge is that the list should be based on a Prefect parameter. Below is the stackoverflow link to the code I tried, but apparently it doesn’t work as task_dependency_pairs is a task, not a list. How do I make it work without breaking the dependency between the Parameter task, and the other dynamically generated tasks?
    s
    • 2
    • 6
  • j

    Jasono

    11/02/2020, 3:17 AM
    https://stackoverflow.com/questions/64637501/prefect-how-to-imperatively-create-tasks-based-on-a-task-list-derived-from-a-pa
  • k

    Klemen Strojan

    11/02/2020, 10:04 AM
    Hey all - is there a way to change the email of a user?
    c
    • 2
    • 2
  • k

    Kilian

    11/02/2020, 12:14 PM
    Hey, been using prefect for a while and am a big fan. Is it possible to get the error of an upstream task? I have a task that triggers on any_failed and it receives
    prefect.engine.signals.TRIGGERFAIL('Trigger was "all_successful" but some of the upstream tasks failed.')
    I would like to get the actual error message of the upstream task, in this case
    <Failed: "Unexpected error: NameError("name 'asd' is not defined")">
    @task(trigger=prefect.triggers.any_failed)
    def any_failed(data):
        logger = prefect.context.get('logger')
        print(data)
        <http://logger.info|logger.info>("Failed! Error message: %s", data)
    
    @task()
    def fail():
        asd
        return None
    
    @task
    def process(data):
        return True
    
    with Flow('test') as f:
        data = fail()
        result = process(data)
        any_failed(result)
        f.set_reference_tasks([result])
    
    state = f.run()
    And I would like to get the upstream error message in any_failed. I was also looking into state_handlers, but I would prefer this solution having a single point to catch any errors.
    s
    • 2
    • 3
  • m

    Maura Drabik

    11/02/2020, 3:55 PM
    Hello. I'm trying to register my flow to the cloud. It failed the deployment healthcheck with the following:
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 121, in <module>
        flows = cloudpickle_deserialization_check(flow_file_path)
      File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
        flows.append(cloudpickle.load(f))
    ModuleNotFoundError: No module named 'utils'
    
    Removing intermediate container 1128b2fe7075
    The command '/bin/sh -c python /opt/prefect/healthcheck.py '["/opt/prefect/flows/scm-validation-flow.prefect"]' '(3, 6)'' returned a non-zero code: 1
    Any guidance on how to resolve? thanks in advance.
    m
    • 2
    • 1
  • j

    Joël Luijmes

    11/02/2020, 4:39 PM
    Can I use or create a task around python’s async/await? Such that I can do multiple IO things in single task parallel?
    ✅ 1
    k
    e
    • 3
    • 5
  • j

    Joseph Haaga

    11/02/2020, 5:00 PM
    If v2 of my Flow is broken, is there any way to revert to v1? Or should I re-register v1 as v3?
    k
    • 2
    • 1
  • a

    Avi A

    11/02/2020, 7:21 PM
    Is there anyone here who’s had the chance to combine Prefect with MLFlow and would like to exchange ideas? 😛refect-duck:
Powered by Linen
Title
a

Avi A

11/02/2020, 7:21 PM
Is there anyone here who’s had the chance to combine Prefect with MLFlow and would like to exchange ideas? 😛refect-duck:
View count: 1