Hello, How is possible that A flow.run() works and...
# ask-community
i
Hello, How is possible that A flow.run() works and a run from the server doesn't. I get the next message.
Copy code
Unexpected error: PicklingError('Could not pickle object as excessively deep recursion required.')
Traceback (most recent call last):
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
RecursionError: maximum recursion depth exceeded while pickling an object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 909, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/results/local_result.py", line 115, in write
    value = self.serializer.serialize(new.value)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
    return cloudpickle.dumps(value)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
After some testing I also get a problem while publishing my script with flow.register:
Copy code
Traceback (most recent call last):
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 639, in reducer_override
    if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/inigo/project/scripts/es_am_08_caetano_to_s4e/caetano_to_s4e_flow.py", line 65, in <module>
    flow.register(project_name='S4E')
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
    registered_flow = client.register(
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1127, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1489, in serialize
    self.storage.add_flow(self)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
    f.write(flow_to_bytes_pickle(flow))
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/utilities/storage.py", line 177, in flow_to_bytes_pickle
    cloudpickle.dumps(flow, protocol=4), newline=False
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
a
Can you move the traceback to the main thread to keep the main channel clean? Can you share your flow and how you tried to register it? Something is not serializable in your flow
i
I hope now it is better. How can I share with you my code? thanks
a
Ideally directly here in the thread, just redact anything sensitive. Or send via DM if you don't want to share the code publicly
i
my flow.py is
Copy code
from prefect import Flow, Parameter, unmapped, mapped
import sys
import mechanicalsoup

# adding Folder_2 to the system path
sys.path.insert(0, '/home/inigo/project/scripts/')
from es_am_08_caetano_to_s4e.caetano_to_s4e_tasks import login, report_url, send_file, transform_table,report_data

browser = mechanicalsoup.StatefulBrowser()


with Flow("Get_caetano_to_s4e") as flow:
    # Input Parameters
    url_login = 
    user = 
    passwd = 

    # Period fixed to a max of 10 days  because of the web page
    period = Parameter('period',default=["2022-02-23", "2022-02-25"])


    browser = login(browser,user,passwd,url_login)

    # browser.open(url=url_login)

    # # Select form in the page, we leave it empty cause it is the only one
    # browser.select_form()

    # # Set the username and password to log in
    # browser["_username"] = user
    # browser["_password"] = passwd

    # # Submit the login button
    # # resp = browser.submit_selected()
    # browser.submit_selected()

    # To see the form data and names
    # browser.form.print_summary()

    # Caetano Auto Montijo = 'PT_DIN_07' = 73
    # Caetano Auto - Palhais|Barreiro = 'PT_DIN_01' =74
    # Caetano Baviera - Aveiro = 'PT_DIN_06' = 75
    # Caetano Baviera - Portimão = 'PT_DIN_05' = 76
    # Caetano Motors - Almada = 'PT_DIN_03' = 77
    # Caetano Motors - Palhais|Barreiro = 'PT_DIN_04' = 78
    # Caetano Star - Castelo Branco = 'PT_DIN_02' =79

    # plantas = Parameter('plantas',default=[('PT_DIN_07', 73),
    #                                         ('PT_DIN_01' , 74),
    #                                         ('PT_DIN_06' , 75),
    #                                         ('PT_DIN_05' , 76),
    #                                         ('PT_DIN_03' , 77),
    #                                         ('PT_DIN_04' , 78),
    #                                         ('PT_DIN_02' ,79)])
    plantas = Parameter('plantas',default=[('PT_DIN_01', 74)])

    url_report = report_url.map(plantas)

    response = report_data(unmapped(browser),mapped(url_report),unmapped(period))
    
    df = transform_table(mapped(response),mapped(plantas))
    
    send_file.map(mapped(df),mapped(plantas))

flow.register(project_name='S4E')
# flow.run(parameters={'period':['2022-02-25','2022-02-26']})
Then my tasks.py is:
Copy code
import requests, os, sys, socket, time
import prefect,json, datetime
from dateutil.relativedelta import relativedelta
from prefect.client.secrets import Secret
from prefect import task
from requests.structures import CaseInsensitiveDict
import pandas as pd
import mechanicalsoup
from bs4 import BeautifulSoup

@task
def login(browser,user,passwd,url_login):
    # Create object and go to url_login
    
    browser.open(url=url_login)

    # Select form in the page, we leave it empty cause it is the only one
    browser.select_form()

    # Set the username and password to log in
    browser["_username"] = user
    browser["_password"] = passwd

    # Submit the login button
    # resp = browser.submit_selected()
    browser.submit_selected()

    return browser

@task
def report_url(plantas):
    site = plantas[1]
    url_report = f'<https://gegsc.rigorcg.pt/admin/app/board/{site}/report>'
    return url_report

def today():
    today = datetime.datetime.today()
    return [today,today]

@task
def report_data(browser,url_report,period):
    if len(period) == 0:
        period = today()
    browser.open(url=url_report)

    browser.select_form()

    # Enter all data required for the filter
    browser["period[start_date]"] = period[0]
    browser["period[end_date]"] = period[1]
    browser["fields[activeEnergyGenerated]"] = "Active Energy Generated"
    browser["fields[averageRadiation]"] = "Average Radiation"
    response = browser.submit_selected()
    browser.close()
    return response.text

@task
def transform_table(text: str, plantas)->pd.DataFrame:
    ref = plantas[0]
    soup = BeautifulSoup(text,features="html.parser")
    data = []
    table = soup.find('table', attrs={'class':'table table-striped'})
    table_body = table.find('tbody')

    rows = table_body.find_all('tr')
    for row in rows:
        cols = row.find_all('td')
        cols = [ele.text.strip() for ele in cols]
        data.append([ele for ele in cols if ele]) # Get rid of empty values


    df = pd.DataFrame(data)
    df.rename(columns = {0:'date',1: 'energy_tot', 2:'irradiation'}, inplace = True)

    df['meter'] = ref
    df['solarimeter'] = ref
    df = df[['date', 'meter', 'energy_tot', 'solarimeter', 'irradiation']]


    # print(df)
    print("data gathered")
    return df

@task
def send_file(df,plantas):     
    ref = plantas[0]
    url = f"<https://energysoft.app/datapi/v2/data/post/{ref}>"
    print(url)

    payload = df.to_csv(sep=';',index=False)
    # df.to_csv(f'ssss{ref}.csv',sep=';',index=False)

    headers = CaseInsensitiveDict()
    headers["Content-Type"] = "text/csv"
    headers["Authorization"] = "Basic cmVkZW5fc28xOnloOW1qMjF4"

    resp = <http://requests.post|requests.post>(url, headers=headers,data=payload)
    resp_code = resp.status_code 
    print(resp_code)
    return resp_code
all of my code is running under the same machine, both the manual flow.run and the local agent are the same machine.
a
@iñigo thank you so much for sharing the flow! his is helpful. The issue is this object defined at the module level:
Copy code
browser = mechanicalsoup.StatefulBrowser()
this is likely an HTTP client, correct? The issue with this is that Prefect cannot serialize such objects with cloudpickle. So you could switch to script based storage so that Prefect doesn't serialize your flow: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage But the easiest and recommended approach would be to move this object into the task that needs this object and instantiating it within the task at runtime. Specifically, you could move this browser object initialization into the
report_data
task and avoid passing this object between tasks. Lastly, instead of manually modifying Python path:
Copy code
sys.path.insert(0, "/home/inigo/project/scripts/")
You could attach this path to your Prefect agent:
Copy code
prefect agent local start -p /home/inigo/project/scripts/
LMK if you still have any questions about it.
i
Ok, thank you very much! I'll try it later. thanks a lot and have a good weekend.
👍 1