Thread
#prefect-community
    i

    iñigo

    7 months ago
    Hello, How is possible that A flow.run() works and a run from the server doesn't. I get the next message.
    Unexpected error: PicklingError('Could not pickle object as excessively deep recursion required.')
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    RecursionError: maximum recursion depth exceeded while pickling an object
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 909, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/results/local_result.py", line 115, in write
        value = self.serializer.serialize(new.value)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
        raise pickle.PicklingError(msg) from e
    _pickle.PicklingError: Could not pickle object as excessively deep recursion required.
    After some testing I also get a problem while publishing my script with flow.register:
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 639, in reducer_override
        if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
    RecursionError: maximum recursion depth exceeded in comparison
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/es_am_08_caetano_to_s4e/caetano_to_s4e_flow.py", line 65, in <module>
        flow.register(project_name='S4E')
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
        registered_flow = client.register(
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1127, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1489, in serialize
        self.storage.add_flow(self)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
        f.write(flow_to_bytes_pickle(flow))
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/utilities/storage.py", line 177, in flow_to_bytes_pickle
        cloudpickle.dumps(flow, protocol=4), newline=False
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
        raise pickle.PicklingError(msg) from e
    _pickle.PicklingError: Could not pickle object as excessively deep recursion required.
    Anna Geller

    Anna Geller

    7 months ago
    Can you move the traceback to the main thread to keep the main channel clean? Can you share your flow and how you tried to register it? Something is not serializable in your flow
    i

    iñigo

    7 months ago
    I hope now it is better. How can I share with you my code? thanks
    Anna Geller

    Anna Geller

    7 months ago
    Ideally directly here in the thread, just redact anything sensitive. Or send via DM if you don't want to share the code publicly
    i

    iñigo

    7 months ago
    my flow.py is
    from prefect import Flow, Parameter, unmapped, mapped
    import sys
    import mechanicalsoup
    
    # adding Folder_2 to the system path
    sys.path.insert(0, '/home/inigo/project/scripts/')
    from es_am_08_caetano_to_s4e.caetano_to_s4e_tasks import login, report_url, send_file, transform_table,report_data
    
    browser = mechanicalsoup.StatefulBrowser()
    
    
    with Flow("Get_caetano_to_s4e") as flow:
        # Input Parameters
        url_login = 
        user = 
        passwd = 
    
        # Period fixed to a max of 10 days  because of the web page
        period = Parameter('period',default=["2022-02-23", "2022-02-25"])
    
    
        browser = login(browser,user,passwd,url_login)
    
        # browser.open(url=url_login)
    
        # # Select form in the page, we leave it empty cause it is the only one
        # browser.select_form()
    
        # # Set the username and password to log in
        # browser["_username"] = user
        # browser["_password"] = passwd
    
        # # Submit the login button
        # # resp = browser.submit_selected()
        # browser.submit_selected()
    
        # To see the form data and names
        # browser.form.print_summary()
    
        # Caetano Auto Montijo = 'PT_DIN_07' = 73
        # Caetano Auto - Palhais|Barreiro = 'PT_DIN_01' =74
        # Caetano Baviera - Aveiro = 'PT_DIN_06' = 75
        # Caetano Baviera - Portimão = 'PT_DIN_05' = 76
        # Caetano Motors - Almada = 'PT_DIN_03' = 77
        # Caetano Motors - Palhais|Barreiro = 'PT_DIN_04' = 78
        # Caetano Star - Castelo Branco = 'PT_DIN_02' =79
    
        # plantas = Parameter('plantas',default=[('PT_DIN_07', 73),
        #                                         ('PT_DIN_01' , 74),
        #                                         ('PT_DIN_06' , 75),
        #                                         ('PT_DIN_05' , 76),
        #                                         ('PT_DIN_03' , 77),
        #                                         ('PT_DIN_04' , 78),
        #                                         ('PT_DIN_02' ,79)])
        plantas = Parameter('plantas',default=[('PT_DIN_01', 74)])
    
        url_report = report_url.map(plantas)
    
        response = report_data(unmapped(browser),mapped(url_report),unmapped(period))
        
        df = transform_table(mapped(response),mapped(plantas))
        
        send_file.map(mapped(df),mapped(plantas))
    
    flow.register(project_name='S4E')
    # flow.run(parameters={'period':['2022-02-25','2022-02-26']})
    Then my tasks.py is:
    import requests, os, sys, socket, time
    import prefect,json, datetime
    from dateutil.relativedelta import relativedelta
    from prefect.client.secrets import Secret
    from prefect import task
    from requests.structures import CaseInsensitiveDict
    import pandas as pd
    import mechanicalsoup
    from bs4 import BeautifulSoup
    
    @task
    def login(browser,user,passwd,url_login):
        # Create object and go to url_login
        
        browser.open(url=url_login)
    
        # Select form in the page, we leave it empty cause it is the only one
        browser.select_form()
    
        # Set the username and password to log in
        browser["_username"] = user
        browser["_password"] = passwd
    
        # Submit the login button
        # resp = browser.submit_selected()
        browser.submit_selected()
    
        return browser
    
    @task
    def report_url(plantas):
        site = plantas[1]
        url_report = f'<https://gegsc.rigorcg.pt/admin/app/board/{site}/report>'
        return url_report
    
    def today():
        today = datetime.datetime.today()
        return [today,today]
    
    @task
    def report_data(browser,url_report,period):
        if len(period) == 0:
            period = today()
        browser.open(url=url_report)
    
        browser.select_form()
    
        # Enter all data required for the filter
        browser["period[start_date]"] = period[0]
        browser["period[end_date]"] = period[1]
        browser["fields[activeEnergyGenerated]"] = "Active Energy Generated"
        browser["fields[averageRadiation]"] = "Average Radiation"
        response = browser.submit_selected()
        browser.close()
        return response.text
    
    @task
    def transform_table(text: str, plantas)->pd.DataFrame:
        ref = plantas[0]
        soup = BeautifulSoup(text,features="html.parser")
        data = []
        table = soup.find('table', attrs={'class':'table table-striped'})
        table_body = table.find('tbody')
    
        rows = table_body.find_all('tr')
        for row in rows:
            cols = row.find_all('td')
            cols = [ele.text.strip() for ele in cols]
            data.append([ele for ele in cols if ele]) # Get rid of empty values
    
    
        df = pd.DataFrame(data)
        df.rename(columns = {0:'date',1: 'energy_tot', 2:'irradiation'}, inplace = True)
    
        df['meter'] = ref
        df['solarimeter'] = ref
        df = df[['date', 'meter', 'energy_tot', 'solarimeter', 'irradiation']]
    
    
        # print(df)
        print("data gathered")
        return df
    
    @task
    def send_file(df,plantas):     
        ref = plantas[0]
        url = f"<https://energysoft.app/datapi/v2/data/post/{ref}>"
        print(url)
    
        payload = df.to_csv(sep=';',index=False)
        # df.to_csv(f'ssss{ref}.csv',sep=';',index=False)
    
        headers = CaseInsensitiveDict()
        headers["Content-Type"] = "text/csv"
        headers["Authorization"] = "Basic cmVkZW5fc28xOnloOW1qMjF4"
    
        resp = <http://requests.post|requests.post>(url, headers=headers,data=payload)
        resp_code = resp.status_code 
        print(resp_code)
        return resp_code
    all of my code is running under the same machine, both the manual flow.run and the local agent are the same machine.
    Anna Geller

    Anna Geller

    7 months ago
    @iñigo thank you so much for sharing the flow! his is helpful. The issue is this object defined at the module level:
    browser = mechanicalsoup.StatefulBrowser()
    this is likely an HTTP client, correct? The issue with this is that Prefect cannot serialize such objects with cloudpickle. So you could switch to script based storage so that Prefect doesn't serialize your flow: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage But the easiest and recommended approach would be to move this object into the task that needs this object and instantiating it within the task at runtime. Specifically, you could move this browser object initialization into the
    report_data
    task and avoid passing this object between tasks. Lastly, instead of manually modifying Python path:
    sys.path.insert(0, "/home/inigo/project/scripts/")
    You could attach this path to your Prefect agent:
    prefect agent local start -p /home/inigo/project/scripts/
    LMK if you still have any questions about it.
    i

    iñigo

    7 months ago
    Ok, thank you very much! I'll try it later. thanks a lot and have a good weekend.