• Edison A

    Edison A

    1 year ago
    I hope I'm posting in the right place. I need help on how to call a class method as a task. Check 👇 the thread for the Code and StackTrace.
    Edison A
    Michael Adkins
    14 replies
    Copy to Clipboard
  • a

    Arthur Duarte

    1 year ago
    Hi, my first message here 🙂. I am new to Prefect, and I was able to run my flow from my local PC (Windows, python 3.8). Now I am trying to build it to a Docker Storage, and I am getting this error during the health check:
    raise NotImplementedError("cannot instantiate %r on your system" NotImplementedError: cannot instantiate 'WindowsPath' on your system
    Any ideas?
    a
    nicholas
    +2
    8 replies
    Copy to Clipboard
  • Edison A

    Edison A

    1 year ago
    Pickle error when trying to register project name
    flow.register('project_name')
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1623, in register
        registered_flow = client.register(
      File "venv/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1450, in serialize
        self.storage.add_flow(self)
      File "venv/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 144, in add_flow
        flow_location = flow.save(flow_location)
      File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1519, in save
        cloudpickle.dump(self, f)
      File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 55, in dump
        CloudPickler(
      File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle 'weakref' object
    Edison A
    nicholas
    2 replies
    Copy to Clipboard
  • h

    Hui Zheng

    1 year ago
    Hello, I am using GKE cluster as Prefect agent cluster to run flows. Is it possible to expose the GKE Pod Information to Flow in the Containers? Maybe using Environment Variables like this ? https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/
    h
    Chris White
    2 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    1 year ago
    Hi there. Is there a way to get notified by Prefect cloud if an agent goes offline?
    Pedro Machado
    1 replies
    Copy to Clipboard
  • ale

    ale

    1 year ago
    Hey folks, are you going to record the events scheduled for today? I’m based in Italy and it would be a little late for me to attend, but I would definitely love to watch the recording 🙂
    ale
    1 replies
    Copy to Clipboard
  • Sque

    Sque

    1 year ago
    Hey folks, I have just started diving into prefect and I am trying to migrate some code from luigi One of the problems I am facing is that we used the dynamic dependency system of luigi, so a task could
    yield
    other tasks as dependents. I was looking for similar functionality and the closest I found is the
    LOOP
    but the problem is that this permits only one direct dependency while I need to create multiple. I also tried to represent the problem on the flow level but unfortunately it does not work in my case. The task dependencies depend on the value of parameters so this is not known until a flow is executed. Any ideas here what is the prefect way to tackle this?
    Sque
    Dylan
    21 replies
    Copy to Clipboard
  • Joël Luijmes

    Joël Luijmes

    1 year ago
    Hey, I’m evaluating both Airflow and Prefect in order to monitor and build data pipelines. As a proof of concept I want to make a sync from Postgresql -> BigQuery. With Airflow this was pretty straightforward (see attached snippet). But I’m struggeling to implement this with Prefect, any suggestions? Specifically the question is: how can I change the query based on result of previous task? Additional question: can I export all the results to jsonl format (or something similar) directory from the Postgres task? Or should I do it manually in the code? This obviously fails
    from prefect import task
    from prefect import Flow
    from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
    
    
    with Flow("PostgreSQL -> BigQuery Sync") as flow:
        max_product_id = PostgresFetch(
            'manager', 
            'postgres',
            'localhost',
            15432,
            # query='SELECT MAX(id) from products'
            query='SELECT 188000'
        )
    
        query_products = PostgresFetch(
            'manager',
            'postgres',
            'localhost',
            15432,
            fetch='all',
            # query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
        )
    
        products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')
    
    
    
    state = flow.run()
    print(state.result[max_product_id].result)
    print(state.result[query_products].result)
    print(state.result[products].result)
    Joël Luijmes
    Chris White
    +2
    8 replies
    Copy to Clipboard
  • Edison A

    Edison A

    1 year ago
    What is the best way of dealing with loops inside a flow? I wrote a program which has to loop through a list of file names, scrape their xml, process each, write the results to a database. This does not work. (All functions called are tasks)
    with Flow("epex_scraping", schedule=schedule) as flow:
        """Main definition of all Data pipeline steps"""
        report_names = scrape_for_file_names()
        for report_name in report_names:
            # extract
            report_xml = get_xml_files(report_name)
            report_json = get_xml_jsons(report_xml)
            # transform
            public_trades_collection = generate_public_trades(report_json)
            # load
            write_to_public_trades_db(public_trades_collection)
    
    flow.register('project_x')
    flow.run()
    Edison A
    Dylan
    6 replies
    Copy to Clipboard
  • Joseph Haaga

    Joseph Haaga

    1 year ago
    I have a list of
    newspaper.Article
    objects (from
    newspaper3k
    ) that I would like to analyze w/ a Spacy model. Is
    unmapped
    an appropriate way to pass in the
    spacy
    model to the task without initializing it each time? e.g.
    @task
    def get_articles() -> List[Article]:
       ...
       return articles
    
    @task
    def load_spacy():
       return spacy.load("en_core_web_md")  # this is a slow operation
    
    @task
    def extract_organizations(article: Article, nlp) -> Set:
       return nlp(article.text).ents
    
    with Flow("Extract Orgs from News Articles"):
       articles = get_articles()
       nlp = load_spacy()
       extract_organizations.map(articles, nlp=unmapped(nlp))
    Joseph Haaga
    1 replies
    Copy to Clipboard