https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Carlos Cueto

06/08/2022, 4:15 PM
Hi everyone, having a bit of a configuration problem. I have a script that runs fine when I run it locally but when I register it with Cloud, it fails to execute on there with the following error:
Failed to load and execute Flow's environment: ValueError('No flows found in file.')
This is the flow definition:
if __name__ == '__main__':
with Flow('Scouter-Solr-Script') as flow:
snowflake_user = PrefectSecret('snowflake_usr')
snowflake_pwd = PrefectSecret('snowflake_pwd')
snowflake_to_solr(snowflake_user, snowflake_pwd)
flow.run_config = LocalRun(labels=['SVRNAME1'])
flow.storage = Git(repo="Prefect-Flows", flow_path="Python/Scouter/snowflake_to_solr.py", git_clone_url_secret_name="azure_devops_clone_url")
flow.register(project_name='Scouter')
I'm assuming it has to do with the
if __name__ == '__main__'
part on top of the Flow class definition, but I don't know how to go about fixing this. I need that for multiprocessing that happens within the main task of the flow.
k

Kevin Kho

06/08/2022, 4:26 PM
Define your show above the main. Only register needs to be in the if name == main portion
It opens the file from storage and looks for flows in the global namespace
c

Carlos Cueto

06/08/2022, 4:45 PM
I moved the flow definition outside the
if __main__
but now it leads to an error with the multiprocessing that goes on inside the
snowflake_to_solr
task defined inside the flow: _pickle.PicklingError: Can't pickle <function convertToRecords at 0x000001E0C09137F0>: attribute lookup convertToRecords on main failed
k

Kevin Kho

06/08/2022, 4:47 PM
Seems like there is something outside the Flow that is used but can’t be pickled by
cloudpicke
. Could you move that inside the task instead?
c

Carlos Cueto

06/08/2022, 5:07 PM
Moved the function that is called on multiprocessing to inside the task...getting the following now: Task 'Snowflake to SOLR': Exception encountered during task execution! Traceback (most recent call last): File "C:\python310\lib\site-packages\prefect\engine\task_runner.py", line 876, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "C:\python310\lib\site-packages\prefect\utilities\executors.py", line 467, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "<string>", line 112, in snowflake_to_solr File "<string>", line 40, in applyParallel File "C:\python310\lib\multiprocessing\pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "C:\python310\lib\multiprocessing\pool.py", line 771, in get raise self._value File "C:\python310\lib\multiprocessing\pool.py", line 537, in _handle_tasks put(task) File "C:\python310\lib\multiprocessing\connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "C:\python310\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'snowflake_to_solr.<locals>.convertToRecords'
k

Kevin Kho

06/08/2022, 5:08 PM
Is that a class Task that has another method called
convertToRecords
?
c

Carlos Cueto

06/08/2022, 5:40 PM
It's a function with the
@task
decorator, with another sub-function inside of it called
convertToRercords
. It was previously outside the task function.
k

Kevin Kho

06/09/2022, 12:23 AM
Sorry I had a busy day and am looking at this. Did you make a multipprocessing pool? The log doesn’t seem like Dask? It seems like it needed to pickle a class or function to use the multiprocessing pool?
c

Carlos Cueto

06/09/2022, 1:38 PM
No problem. Yes, it's a multiprocessing pool that calls an external (non-task) function in parallel. That was my initial code, at least. I moved the function inside the task as a sub-function but that threw the error above.
k

Kevin Kho

06/09/2022, 2:22 PM
Yeah this is not exactly a Prefect error. The thing is Python will need to pickle that function to move it between processes and there is something blocking it from doing so
f

Faheem Khan

07/14/2022, 8:29 AM
@Carlos Cueto did u manage to find a solution. I am in the same boat but using Dasktaskrunner
16 Views