Hi I have a function wrapped in a `@task` decorato...
# prefect-community
s
Hi I have a function wrapped in a
@task
decorator. When the function is executed in a flow everything runs fine. If I write the same code by subclassing
Task
I get a pickling error:
Copy code
pickle.UnpicklingError: NEWOBJ class argument isn't a type object
The issue happened in a couple of other functions. I guess there is something basic in the Task construction that I am missing. I will appreciate if you can shine some light on this issue! Thanks Example of the code: Gist Tasks code
s
How are you using the class in your flow?
s
Hi
Copy code
# subclassed task
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
            storage=Local(directory='/home/simone/tmp_code/flows')) as flow:

    create_folders = create_folder_structure()
    folders = create_folders(experiment_fpath)

flow.register(project_name="test")


# @task wrapped
with Flow("folder-organization",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:32833>')),
            storage=Local(directory='/home/simone/tmp_code/flows')) as flow:

    folders = create_folder_structure(experiment_fpath)

flow.register(project_name="test")
d
Hi @simone! hmmmm. Can you post a full example with you subclassed task code? (the gist from above with your flow code all in one block so it’s runnable for us). Also, do you get that error when attempting to run the flow or register it?
s
Hi Ok I will post in a couple of min. I get the error when i run the flow
d
👍 thanks!
s
When using a Task class, you may need to instantiate it and then run it, so you may do
create_folder_structure()()
s
Hi sorry for the late answer. I run a couple of extra test. I made the flows with wrapped or subclassed Tasks (linked gists below). If i include the function in the script with the flow both scripts run fine but if I import the subclassed task into the flow containing script I gett the error. subclassed_version wrapped_version @Spencer In the subclassed version I instantiate the function:
create_folder_structure()
and the run
create_folder_structure(experiment_fpath)
if i instantiate as suggested I must add the
experiment_fpath
between the last pair of
()
and I get the same error.
d
Hi @simone, this flow runs successfully for me with Prefect 0.13.15 and python 3.8
Copy code
(prefect) dylanhughes@Dylans-MacBook-Pro-Prefect ~/dev> ipython -i thing.py
Python 3.7.7 (default, Mar 26 2020, 10:32:53) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.13.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: flow.run(parameters={"experiment_fpath": "/Users/dylanhughes/dev/thing_results"})                                                                                                                                                                            
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'subclassed'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'experiment_fpath': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Starting task run...
[2020-11-19 12:10:55-0500] INFO - prefect.TaskRunner | Task 'create_folder_structure': Finished task run for task with final state: 'Success'
[2020-11-19 12:10:55-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[1]: <Success: "All reference tasks succeeded.">
s
did u try to take out the
_create_folder_structure_
class, save it in another file and import it in the script with the flow? that is when I got the error. Thanks a lot for helping out!
d
I ran your
subclassed_version
gist as is
I will try that
s
ok that I saw it was working also on my side. I noticed it when I was getting the code ready for you. The issue showed up again when the class was imported. Let me know if you can replicate the problem
d
Hey @simone, I can’t seem to replicate your issue
Copy code
import prefect
from prefect import task, Flow, Parameter, flatten, unmapped
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path
from folder_class import create_folder_structure

# subclassed task
with Flow(
    "subclassed",
    environment=LocalEnvironment(DaskExecutor(address="<tcp://193.10.16.58:32833>")),
    storage=Local(directory="~/dev/thing_results"),
) as flow:

    experiment_fpath = Parameter("experiment_fpath", default="/wsfish/smfish_ssd/test")
    create_folders = create_folder_structure()
    folders = create_folders(experiment_fpath)

# flow.register(project_name="test")
Copy code
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect import Task
from prefect.environments.storage import Local
import os
from pathlib import Path


class create_folder_structure(Task):
    """
    Class used to create the folder structure where to sort the files
    generated by the machines and the saving the data created during the
    processing. It creates the backbone structure common to all analysis

    FOLDER STRUCTURE
    - original_robofish_logs: contains all the original robofish logs.
        - extra_files: contains the extra files acquired during imaging.
        - extra_processing_data: contains extra files used in the analysis
                                                                                                like the dark images for flat field correction.
    - pipeline_config: contains all the configuration files.
    - raw_data: contains the renamed .nd2 files and the corresponding
                                                pickle configuration files. It is the directory that is
                                                backed up on the server.
        - output_figures: contains the reports and visualizations
    - notebooks: will contain potential notebooks used for processing the data
    - probes: will contains the fasta file with the probes used in the experiment
    - tmp: save temporary data

    Args:
        experiment_fpath: str
            folder path of the experiment
    """

    def run(self, experiment_fpath: str):
        """
        Class used to create the folder structure where to sort the files
        generated by the machines and the saving the data created during the
        processing. It creates the backbone structure common to all analysis

        FOLDER STRUCTURE
        - original_robofish_logs: contains all the original robofish logs.
        - extra_files: contains the extra files acquired during imaging.
        - extra_processing_data: contains extra files used in the analysis
                                                    like the dark images for flat field correction.
        - pipeline_config: contains all the configuration files.
        - raw_data: contains the renamed .nd2 files and the corresponding
                            pickle configuration files. It is the directory that is
                            backed up on the server.
        - output_figures: contains the reports and visualizations
        - notebooks: will contain potential notebooks used for processing the data
        - probes: will contains the fasta file with the probes used in the experiment
        - tmp: save temporary data

        Args:
            experiment_fpath: str
                folder path of the experiment
        """
        experiment_fpath = Path(experiment_fpath)
        folders_list = [
            "raw_data",
            "original_robofish_logs",
            "extra_processing_data",
            "extra_files",
            "pipeline_config",
            "output_figures",
            "notebooks",
            "probes",
            "tmp",
        ]
        for folder_name in folders_list:
            try:
                os.stat(experiment_fpath / folder_name)
                <http://self.logger.info|self.logger.info>(f"{folder_name} already exist")
            except FileNotFoundError:
                os.mkdir(experiment_fpath / folder_name)
                os.chmod(experiment_fpath / folder_name, 0o777)
Working fine for me
I suspect this may be more of python imports issue than a prefect issue
If you want to register this flow with code in multiple files, you’ll need to make sure your flow has access to the class file in the execution environment
the easiest way to to that is to use docker storage and to copy all necessary files to a known location in the container and add the directory to your PYTHONPATH
These might be helpful!
s
Thanks a lot for the help! I will try to figure out what the issue is. Thanks a lot for your time, really appreciated!
d
Of course!