Hey folks! I'm struggling with implementing condit...
# ask-community
r
Hey folks! I'm struggling with implementing conditional logic inside my Prefect flow. This script works but this one with the conditional logic fails with a rather obscure repetitive error message. AFAICT I'm following the guidelines outline in the Prefect docs here. Would really appreciate any pointers on what might be going wrong here!
Copy code
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
I think I found the bug here. I'm calling 
Client()
 within the 
with case(check_size, True):
 clause, but should be wrapping that into a Prefect Task.
k
Hey @Richard Pelgrim, did you fix this? See this and this or an explanation
r
Thanks @Kevin Kho. That took me 1 step in the right direction. Now failing with:
Copy code
Task exception was never retrieved
future: <Task finished name='Task-53' coro=<_wrap_awaitable() done, defined at /Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py:683> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 676, in start
    await self.process.start()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
When I set
processes=False
in the
LocalCluster
I get the following error:
Copy code
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/executors/dask.py", line 424, in wait
    return self.client.gather(futures)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1943, in gather
    return self.sync(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 840, in sync
    return sync(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1837, in _gather
    response = await future
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1888, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 874, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 651, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
    msg = await from_frames(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
    return merge_and_deserialize(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 475, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 407, in deserialize
    return loads(header, frames)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 170, in serialization_error_loads
    raise TypeError(msg)
TypeError: Could not serialize object of type Success.
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'Client.__init__.<locals>.<lambda>'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps
    frames[0] = pickle.dumps(
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_asyncio.Task' object

[2021-09-08 16:30:46+0200] ERROR - prefect.Github ETL Test | Unexpected error occured in FlowRunner: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n    result = pickle.dumps(x, **dump_kwargs)\nAttributeError: Can\'t pickle local object \'Client.__init__.<locals>.<lambda>\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize\n    header, frames = dumps(x, context=context) if wants_context else dumps(x)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps\n    frames[0] = pickle.dumps(\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n    result = cloudpickle.dumps(x, **dump_kwargs)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps\n    cp.dump(obj)\n  File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n    return Pickler.dump(self, obj)\nTypeError: cannot pickle \'_asyncio.Task\' object\n')
Does this look familiar to you at all? If not, I know a few Dask folks I could ping 😉
k
I think I have seen this when you try to spawn a multiprocessing pool on a Dask worker. Is that what you’re trying to do? The
TypeError: cannot pickle '_asyncio.Task' object
might be on the Prefect side. In your Flow code simple enough to share?
r
I've set
processes
to
False
so should not be spawning a multiprocessing pool AFAIK.
Copy code
import datetime
import coiled
from distributed import Client, LocalCluster
import dask.bag as db
import ujson

from prefect import task, Flow, case
from prefect.tasks.control_flow import merge
from prefect.executors.dask import DaskExecutor

# SET-UP
# spin up cluster function
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def start_cluster(name="prefect", software="coiled-examples/prefect", n_workers=10):
    '''
    This task spins up a Coiled cluster and connects it to the Dask client.
    name: name of the cluster, defaults to "prefect"
    software: Coiled software environment to install on all workers, defaults to "coiled-examples/prefect"
    n_workers: number of Dask workers in the cluster, defaults to 10
    '''
    cluster = coiled.Cluster(
        name=name,
        software=software,
        n_workers=n_workers,
        )
    client = Client(cluster)
    return client

# spin up local cluster in case len(files) is small
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def start_local_cluster():
    cluster = LocalCluster(processes=False)
    client = Client(cluster)
    return client

# create list of filenames to fetch
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def create_list(start_date, end_date, format="%d-%m-%Y"):
    '''
    This task generates a list of filenames to fetch from the Github Archive API.
    start_date: a string containing the start date
    end_date: a string containing the end date
    format: datetime format, defaults to "%d-%m-%Y"
    '''
    start = datetime.datetime.strptime(start_date, format)
    end = datetime.datetime.strptime(end_date, format)
    date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
    prefix = "<https://data.gharchive.org/>"
    filenames = []
    for date in date_generated:
        for hour in range(1,24):
            filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
    return filenames

# EXTRACT
# get data from Github api
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_github_data(filenames):
    '''
    Task to fetch JSON data from Github Archive project and filter out PushEvents.
    filenames: list of filenames created with create_list() task
    '''
    records = db.read_text(filenames).map(ujson.loads)
    push_events = records.filter(lambda record: record["type"] == "PushEvent")
    return push_events



# TRANSFORM 
# transform json into dataframe
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_dataframe(push_events):
    '''
    This task processes the nested json data into a flat, tabular format. 
    Each row represents a single commit.
    push_events: PushEvent data fetched with get_github_data()
    '''
    def process(record):
            try:
                for commit in record["payload"]["commits"]:
                    yield {
                        "user": record["actor"]["login"],
                        "repo": record["repo"]["name"],
                        "created_at": record["created_at"],
                        "message": commit["message"],
                        "author": commit["author"]["name"],
                    }
            except KeyError:
                pass
        
    processed = push_events.map(process)
    df = processed.flatten().to_dataframe()
    return df


# LOAD
# write to parquet
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_parquet(df, path):
    '''
    This task writes the flattened dataframe of PushEvents to the specified path as a parquet file.
    path: directory to write parquet file to, can be local or cloud store.
    '''
    df.to_parquet(
        path,
        engine='pyarrow',
        compression='snappy'
    )

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def check_size(filenames):
    # there are probably many (better) ways to do this, but let's use a trivial method for now
    return len(filenames) < 50

# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
    filenames = create_list(start_date="01-01-2015", end_date="02-01-2015")
    check_size = check_size(filenames)
    
    # check if len(filenames) < 50
    #if True, start localcluster as client
    with case(check_size, True):
        client1 = start_local_cluster()
    #if false, start Coiled cluster as client
    with case(check_size, False):
        client2 = start_cluster()

    client = merge(client1, client2)
    push_events = get_github_data(filenames)
    df = to_dataframe(push_events)

    with case(check_size, True):
        to_parquet(df, path="small-prefect-test-conditional.parq")
    with case(check_size, False):
        to_parquet(df, path="<s3://coiled-datasets/prefect/prefect-test-conditional.parq>")

if __name__ == "__main__":
    flow.run(executor=DaskExecutor())
Can also copy it over to a public repo if that's helpful
k
This is fine
Something is off here. You are creating another Coiled cluster inside the
DaskExecutor()
. This error is likely coming from your functions to start a cluster. You should do this instead:
Copy code
executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "kvnkho/prefect",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
    },
)
flow.executor = executor
flow.register(...)
This will spin the Coiled cluster for you when the flow runs. The specific error is likely coming from spinning up a
LocalCluster
inside the
DaskExecutor
because the
LocalCluster
is a multiprocessing pool.
I don’t think you can dynamically set the size like what you’re trying to do btw. The Executor has to be defined by the time the
len(filenames)
exists.
I think you need a Resouce Manager for this dynamic kind of thing.
Is this your article btw?
Now I understand why you said you know Dask folks lol 😆
r
Yup, that's me! 🙂
Thanks for the tips, taking another look now, will let you know how I get on