Richard Pelgrim

    Richard Pelgrim

    1 year ago
    Hey folks! I'm struggling with implementing conditional logic inside my Prefect flow.This script works but this one with the conditional logic fails with a rather obscure repetitive error message. AFAICT I'm following the guidelines outline in the Prefect docs here. Would really appreciate any pointers on what might be going wrong here!
    File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch
        prep_data = spawn.get_preparation_data(process_obj._name)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data
        _check_not_importing_main()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
        raise RuntimeError('''
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.
    
            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:
    
                if __name__ == '__main__':
                    freeze_support()
                    ...
    
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce an executable.
    I think I found the bug here. I'm calling 
    Client()
     within the 
    with case(check_size, True):
     clause, but should be wrapping that into a Prefect Task.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Richard Pelgrim, did you fix this? See this and this or an explanation
    Richard Pelgrim

    Richard Pelgrim

    1 year ago
    Thanks @Kevin Kho. That took me 1 step in the right direction. Now failing with:
    Task exception was never retrieved
    future: <Task finished name='Task-53' coro=<_wrap_awaitable() done, defined at /Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py:683> exception=AssertionError('daemonic processes are not allowed to have children')>
    Traceback (most recent call last):
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
        return (yield from awaitable.__await__())
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 283, in _
        await self.start()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
        response = await self.instantiate()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
        result = await self.process.start()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 676, in start
        await self.process.start()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
        res = func(*args, **kwargs)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
        process.start()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/process.py", line 118, in start
        assert not _current_process._config.get('daemon'), \
    AssertionError: daemonic processes are not allowed to have children
    When I set
    processes=False
    in the
    LocalCluster
    I get the following error:
    Traceback (most recent call last):
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
        final_states = executor.wait(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/executors/dask.py", line 424, in wait
        return self.client.gather(futures)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1943, in gather
        return self.sync(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 840, in sync
        return sync(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 326, in sync
        raise exc.with_traceback(tb)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 309, in f
        result[0] = yield future
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1837, in _gather
        response = await future
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1888, in _gather_remote
        response = await retry_operation(self.scheduler.gather, keys=keys)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
        return await retry(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
        return await coro()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 874, in send_recv_from_rpc
        result = await send_recv(comm=comm, op=key, **kwargs)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 651, in send_recv
        response = await comm.read(deserializers=deserializers)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
        msg = await from_frames(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
        res = _from_frames()
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
        return protocol.loads(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
        return msgpack.loads(
      File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
        return merge_and_deserialize(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 475, in merge_and_deserialize
        return deserialize(header, merged_frames, deserializers=deserializers)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 407, in deserialize
        return loads(header, frames)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 170, in serialization_error_loads
        raise TypeError(msg)
    TypeError: Could not serialize object of type Success.
    Traceback (most recent call last):
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
        result = pickle.dumps(x, **dump_kwargs)
    AttributeError: Can't pickle local object 'Client.__init__.<locals>.<lambda>'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize
        header, frames = dumps(x, context=context) if wants_context else dumps(x)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps
        frames[0] = pickle.dumps(
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
        result = cloudpickle.dumps(x, **dump_kwargs)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle '_asyncio.Task' object
    
    [2021-09-08 16:30:46+0200] ERROR - prefect.Github ETL Test | Unexpected error occured in FlowRunner: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n    result = pickle.dumps(x, **dump_kwargs)\nAttributeError: Can\'t pickle local object \'Client.__init__.<locals>.<lambda>\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize\n    header, frames = dumps(x, context=context) if wants_context else dumps(x)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps\n    frames[0] = pickle.dumps(\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n    result = cloudpickle.dumps(x, **dump_kwargs)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps\n    cp.dump(obj)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n    return Pickler.dump(self, obj)\nTypeError: cannot pickle \'_asyncio.Task\' object\n')
    Does this look familiar to you at all? If not, I know a few Dask folks I could ping 😉
    Kevin Kho

    Kevin Kho

    1 year ago
    I think I have seen this when you try to spawn a multiprocessing pool on a Dask worker. Is that what you’re trying to do? The
    TypeError: cannot pickle '_asyncio.Task' object
    might be on the Prefect side. In your Flow code simple enough to share?
    Richard Pelgrim

    Richard Pelgrim

    1 year ago
    I've set
    processes
    to
    False
    so should not be spawning a multiprocessing pool AFAIK.
    import datetime
    import coiled
    from distributed import Client, LocalCluster
    import dask.bag as db
    import ujson
    
    from prefect import task, Flow, case
    from prefect.tasks.control_flow import merge
    from prefect.executors.dask import DaskExecutor
    
    # SET-UP
    # spin up cluster function
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def start_cluster(name="prefect", software="coiled-examples/prefect", n_workers=10):
        '''
        This task spins up a Coiled cluster and connects it to the Dask client.
        name: name of the cluster, defaults to "prefect"
        software: Coiled software environment to install on all workers, defaults to "coiled-examples/prefect"
        n_workers: number of Dask workers in the cluster, defaults to 10
        '''
        cluster = coiled.Cluster(
            name=name,
            software=software,
            n_workers=n_workers,
            )
        client = Client(cluster)
        return client
    
    # spin up local cluster in case len(files) is small
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def start_local_cluster():
        cluster = LocalCluster(processes=False)
        client = Client(cluster)
        return client
    
    # create list of filenames to fetch
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def create_list(start_date, end_date, format="%d-%m-%Y"):
        '''
        This task generates a list of filenames to fetch from the Github Archive API.
        start_date: a string containing the start date
        end_date: a string containing the end date
        format: datetime format, defaults to "%d-%m-%Y"
        '''
        start = datetime.datetime.strptime(start_date, format)
        end = datetime.datetime.strptime(end_date, format)
        date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
        prefix = "<https://data.gharchive.org/>"
        filenames = []
        for date in date_generated:
            for hour in range(1,24):
                filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
        return filenames
    
    # EXTRACT
    # get data from Github api
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def get_github_data(filenames):
        '''
        Task to fetch JSON data from Github Archive project and filter out PushEvents.
        filenames: list of filenames created with create_list() task
        '''
        records = db.read_text(filenames).map(ujson.loads)
        push_events = records.filter(lambda record: record["type"] == "PushEvent")
        return push_events
    
    
    
    # TRANSFORM 
    # transform json into dataframe
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def to_dataframe(push_events):
        '''
        This task processes the nested json data into a flat, tabular format. 
        Each row represents a single commit.
        push_events: PushEvent data fetched with get_github_data()
        '''
        def process(record):
                try:
                    for commit in record["payload"]["commits"]:
                        yield {
                            "user": record["actor"]["login"],
                            "repo": record["repo"]["name"],
                            "created_at": record["created_at"],
                            "message": commit["message"],
                            "author": commit["author"]["name"],
                        }
                except KeyError:
                    pass
            
        processed = push_events.map(process)
        df = processed.flatten().to_dataframe()
        return df
    
    
    # LOAD
    # write to parquet
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def to_parquet(df, path):
        '''
        This task writes the flattened dataframe of PushEvents to the specified path as a parquet file.
        path: directory to write parquet file to, can be local or cloud store.
        '''
        df.to_parquet(
            path,
            engine='pyarrow',
            compression='snappy'
        )
    
    @task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
    def check_size(filenames):
        # there are probably many (better) ways to do this, but let's use a trivial method for now
        return len(filenames) < 50
    
    # Build Prefect Flow
    with Flow(name="Github ETL Test") as flow:
        filenames = create_list(start_date="01-01-2015", end_date="02-01-2015")
        check_size = check_size(filenames)
        
        # check if len(filenames) < 50
        #if True, start localcluster as client
        with case(check_size, True):
            client1 = start_local_cluster()
        #if false, start Coiled cluster as client
        with case(check_size, False):
            client2 = start_cluster()
    
        client = merge(client1, client2)
        push_events = get_github_data(filenames)
        df = to_dataframe(push_events)
    
        with case(check_size, True):
            to_parquet(df, path="small-prefect-test-conditional.parq")
        with case(check_size, False):
            to_parquet(df, path="<s3://coiled-datasets/prefect/prefect-test-conditional.parq>")
    
    if __name__ == "__main__":
        flow.run(executor=DaskExecutor())
    Can also copy it over to a public repo if that's helpful
    Kevin Kho

    Kevin Kho

    1 year ago
    This is fine
    Something is off here. You are creating another Coiled cluster inside the
    DaskExecutor()
    . This error is likely coming from your functions to start a cluster. You should do this instead:
    executor = DaskExecutor(
        cluster_class=coiled.Cluster,
        cluster_kwargs={
            "software": "kvnkho/prefect",
            "shutdown_on_close": True,
            "name": "prefect-cluster",
        },
    )
    flow.executor = executor
    flow.register(...)
    This will spin the Coiled cluster for you when the flow runs. The specific error is likely coming from spinning up a
    LocalCluster
    inside the
    DaskExecutor
    because the
    LocalCluster
    is a multiprocessing pool.
    I don’t think you can dynamically set the size like what you’re trying to do btw. The Executor has to be defined by the time the
    len(filenames)
    exists.
    I think you need a Resouce Manager for this dynamic kind of thing.
    Is this your article btw?
    Now I understand why you said you know Dask folks lol 😆
    Richard Pelgrim

    Richard Pelgrim

    1 year ago
    Yup, that's me! 🙂
    Thanks for the tips, taking another look now, will let you know how I get on