simone
11/19/2020, 2:45 PM@task
decorator. When the function is executed in a flow everything runs fine. If I write the same code by subclassing Task
I get a pickling error:
pickle.UnpicklingError: NEWOBJ class argument isn't a type object
The issue happened in a couple of other functions. I guess there is something basic in the Task construction that I am missing. I will appreciate if you can shine some light on this issue! Thanks
Example of the code: Gist Tasks codeSpencer
11/19/2020, 2:48 PMsimone
11/19/2020, 2:55 PM# subclassed task
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
storage=Local(directory='/home/simone/tmp_code/flows')) as flow:
create_folders = create_folder_structure()
folders = create_folders(experiment_fpath)
flow.register(project_name="test")
# @task wrapped
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
storage=Local(directory='/home/simone/tmp_code/flows')) as flow:
folders = create_folder_structure(experiment_fpath)
flow.register(project_name="test")
Dylan
simone
11/19/2020, 3:11 PMDylan
Spencer
11/19/2020, 4:03 PMcreate_folder_structure()()
simone
11/19/2020, 5:05 PMcreate_folder_structure()
and the run create_folder_structure(experiment_fpath)
if i instantiate as suggested I must add the experiment_fpath
between the last pair of ()
and I get the same error.Dylan
(prefect) dylanhughes@Dylans-MacBook-Pro-Prefect ~/dev> ipython -i thing.py
Python 3.7.7 (default, Mar 26 2020, 10:32:53)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.13.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: flow.run(parameters={"experiment_fpath": "/Users/dylanhughes/dev/thing_results"})
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'subclassed'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[1]: <Success: "All reference tasks succeeded.">
simone
11/19/2020, 5:13 PM_create_folder_structure_
class, save it in another file and import it in the script with the flow? that is when I got the error. Thanks a lot for helping out!Dylan
subclassed_version
gist as issimone
11/19/2020, 5:15 PMDylan
import prefect
from prefect import task, Flow, Parameter, flatten, unmapped
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path
from folder_class import create_folder_structure
# subclassed task
with Flow(
"subclassed",
environment=LocalEnvironment(DaskExecutor(address="<tcp://193.10.16.58:32833>")),
storage=Local(directory="~/dev/thing_results"),
) as flow:
experiment_fpath = Parameter("experiment_fpath", default="/wsfish/smfish_ssd/test")
create_folders = create_folder_structure()
folders = create_folders(experiment_fpath)
# flow.register(project_name="test")
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path
class create_folder_structure(Task):
"""
Class used to create the folder structure where to sort the files
generated by the machines and the saving the data created during the
processing. It creates the backbone structure common to all analysis
FOLDER STRUCTURE
- original_robofish_logs: contains all the original robofish logs.
- extra_files: contains the extra files acquired during imaging.
- extra_processing_data: contains extra files used in the analysis
like the dark images for flat field correction.
- pipeline_config: contains all the configuration files.
- raw_data: contains the renamed .nd2 files and the corresponding
pickle configuration files. It is the directory that is
backed up on the server.
- output_figures: contains the reports and visualizations
- notebooks: will contain potential notebooks used for processing the data
- probes: will contains the fasta file with the probes used in the experiment
- tmp: save temporary data
Args:
experiment_fpath: str
folder path of the experiment
"""
def run(self, experiment_fpath: str):
"""
Class used to create the folder structure where to sort the files
generated by the machines and the saving the data created during the
processing. It creates the backbone structure common to all analysis
FOLDER STRUCTURE
- original_robofish_logs: contains all the original robofish logs.
- extra_files: contains the extra files acquired during imaging.
- extra_processing_data: contains extra files used in the analysis
like the dark images for flat field correction.
- pipeline_config: contains all the configuration files.
- raw_data: contains the renamed .nd2 files and the corresponding
pickle configuration files. It is the directory that is
backed up on the server.
- output_figures: contains the reports and visualizations
- notebooks: will contain potential notebooks used for processing the data
- probes: will contains the fasta file with the probes used in the experiment
- tmp: save temporary data
Args:
experiment_fpath: str
folder path of the experiment
"""
experiment_fpath = Path(experiment_fpath)
folders_list = [
"raw_data",
"original_robofish_logs",
"extra_processing_data",
"extra_files",
"pipeline_config",
"output_figures",
"notebooks",
"probes",
"tmp",
]
for folder_name in folders_list:
try:
os.stat(experiment_fpath / folder_name)
<http://self.logger.info|self.logger.info>(f"{folder_name} already exist")
except FileNotFoundError:
os.mkdir(experiment_fpath / folder_name)
os.chmod(experiment_fpath / folder_name, 0o777)
simone
11/19/2020, 5:59 PMDylan