simone
11/19/2020, 2:45 PM@taskTaskpickle.UnpicklingError: NEWOBJ class argument isn't a type objectSpencer
11/19/2020, 2:48 PMsimone
11/19/2020, 2:55 PM# subclassed task
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
            storage=Local(directory='/home/simone/tmp_code/flows')) as flow:
    create_folders = create_folder_structure()
    folders = create_folders(experiment_fpath)
flow.register(project_name="test")
# @task wrapped
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
            storage=Local(directory='/home/simone/tmp_code/flows')) as flow:
    folders = create_folder_structure(experiment_fpath)
flow.register(project_name="test")Dylan
simone
11/19/2020, 3:11 PMDylan
Spencer
11/19/2020, 4:03 PMcreate_folder_structure()()simone
11/19/2020, 5:05 PMcreate_folder_structure()create_folder_structure(experiment_fpath)experiment_fpath()Dylan
Dylan
(prefect) dylanhughes@Dylans-MacBook-Pro-Prefect ~/dev> ipython -i thing.py
Python 3.7.7 (default, Mar 26 2020, 10:32:53) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.13.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: flow.run(parameters={"experiment_fpath": "/Users/dylanhughes/dev/thing_results"})                                                                                                                                                                            
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'subclassed'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[1]: <Success: "All reference tasks succeeded.">simone
11/19/2020, 5:13 PM_create_folder_structure_Dylan
subclassed_versionDylan
simone
11/19/2020, 5:15 PMDylan
Dylan
import prefect
from prefect import task, Flow, Parameter, flatten, unmapped
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path
from folder_class import create_folder_structure
# subclassed task
with Flow(
    "subclassed",
    environment=LocalEnvironment(DaskExecutor(address="<tcp://193.10.16.58:32833>")),
    storage=Local(directory="~/dev/thing_results"),
) as flow:
    experiment_fpath = Parameter("experiment_fpath", default="/wsfish/smfish_ssd/test")
    create_folders = create_folder_structure()
    folders = create_folders(experiment_fpath)
# flow.register(project_name="test")Dylan
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path
class create_folder_structure(Task):
    """
    Class used to create the folder structure where to sort the files
    generated by the machines and the saving the data created during the
    processing. It creates the backbone structure common to all analysis
    FOLDER STRUCTURE
    - original_robofish_logs: contains all the original robofish logs.
        - extra_files: contains the extra files acquired during imaging.
        - extra_processing_data: contains extra files used in the analysis
                                                                                                like the dark images for flat field correction.
    - pipeline_config: contains all the configuration files.
    - raw_data: contains the renamed .nd2 files and the corresponding
                                                pickle configuration files. It is the directory that is
                                                backed up on the server.
        - output_figures: contains the reports and visualizations
    - notebooks: will contain potential notebooks used for processing the data
    - probes: will contains the fasta file with the probes used in the experiment
    - tmp: save temporary data
    Args:
        experiment_fpath: str
            folder path of the experiment
    """
    def run(self, experiment_fpath: str):
        """
        Class used to create the folder structure where to sort the files
        generated by the machines and the saving the data created during the
        processing. It creates the backbone structure common to all analysis
        FOLDER STRUCTURE
        - original_robofish_logs: contains all the original robofish logs.
        - extra_files: contains the extra files acquired during imaging.
        - extra_processing_data: contains extra files used in the analysis
                                                    like the dark images for flat field correction.
        - pipeline_config: contains all the configuration files.
        - raw_data: contains the renamed .nd2 files and the corresponding
                            pickle configuration files. It is the directory that is
                            backed up on the server.
        - output_figures: contains the reports and visualizations
        - notebooks: will contain potential notebooks used for processing the data
        - probes: will contains the fasta file with the probes used in the experiment
        - tmp: save temporary data
        Args:
            experiment_fpath: str
                folder path of the experiment
        """
        experiment_fpath = Path(experiment_fpath)
        folders_list = [
            "raw_data",
            "original_robofish_logs",
            "extra_processing_data",
            "extra_files",
            "pipeline_config",
            "output_figures",
            "notebooks",
            "probes",
            "tmp",
        ]
        for folder_name in folders_list:
            try:
                os.stat(experiment_fpath / folder_name)
                <http://self.logger.info|self.logger.info>(f"{folder_name} already exist")
            except FileNotFoundError:
                os.mkdir(experiment_fpath / folder_name)
                os.chmod(experiment_fpath / folder_name, 0o777)Dylan
Dylan
Dylan
Dylan
Dylan
Dylan
Dylan
simone
11/19/2020, 5:59 PMDylan
