Richard Pelgrim
09/08/2021, 8:52 AMFile "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Richard Pelgrim
09/08/2021, 1:19 PMClient()
within the with case(check_size, True):
clause, but should be wrapping that into a Prefect Task.Richard Pelgrim
09/08/2021, 2:32 PMTask exception was never retrieved
future: <Task finished name='Task-53' coro=<_wrap_awaitable() done, defined at /Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py:683> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 283, in _
await self.start()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
response = await self.instantiate()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
result = await self.process.start()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/nanny.py", line 676, in start
await self.process.start()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
res = func(*args, **kwargs)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
process.start()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
When I set processes=False
in the LocalCluster
I get the following error:
Traceback (most recent call last):
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/prefect/executors/dask.py", line 424, in wait
return self.client.gather(futures)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1943, in gather
return self.sync(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 840, in sync
return sync(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 326, in sync
raise exc.with_traceback(tb)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils.py", line 309, in f
result[0] = yield future
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1837, in _gather
response = await future
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/client.py", line 1888, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
return await retry(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 874, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/core.py", line 651, in send_recv
response = await comm.read(deserializers=deserializers)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/tcp.py", line 226, in read
msg = await from_frames(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
res = _from_frames()
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
return protocol.loads(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
return merge_and_deserialize(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 475, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 407, in deserialize
return loads(header, frames)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 170, in serialization_error_loads
raise TypeError(msg)
TypeError: Could not serialize object of type Success.
Traceback (most recent call last):
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'Client.__init__.<locals>.<lambda>'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps
frames[0] = pickle.dumps(
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_asyncio.Task' object
[2021-09-08 16:30:46+0200] ERROR - prefect.Github ETL Test | Unexpected error occured in FlowRunner: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)\nAttributeError: Can\'t pickle local object \'Client.__init__.<locals>.<lambda>\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 330, in serialize\n header, frames = dumps(x, context=context) if wants_context else dumps(x)\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 53, in pickle_dumps\n frames[0] = pickle.dumps(\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n result = cloudpickle.dumps(x, **dump_kwargs)\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps\n cp.dump(obj)\n File "/Users/rpelgrim/mambaforge/envs/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n return Pickler.dump(self, obj)\nTypeError: cannot pickle \'_asyncio.Task\' object\n')
Richard Pelgrim
09/08/2021, 2:32 PMKevin Kho
TypeError: cannot pickle '_asyncio.Task' object
might be on the Prefect side. In your Flow code simple enough to share?Richard Pelgrim
09/08/2021, 2:45 PMprocesses
to False
so should not be spawning a multiprocessing pool AFAIK.Richard Pelgrim
09/08/2021, 2:46 PMimport datetime
import coiled
from distributed import Client, LocalCluster
import dask.bag as db
import ujson
from prefect import task, Flow, case
from prefect.tasks.control_flow import merge
from prefect.executors.dask import DaskExecutor
# SET-UP
# spin up cluster function
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def start_cluster(name="prefect", software="coiled-examples/prefect", n_workers=10):
'''
This task spins up a Coiled cluster and connects it to the Dask client.
name: name of the cluster, defaults to "prefect"
software: Coiled software environment to install on all workers, defaults to "coiled-examples/prefect"
n_workers: number of Dask workers in the cluster, defaults to 10
'''
cluster = coiled.Cluster(
name=name,
software=software,
n_workers=n_workers,
)
client = Client(cluster)
return client
# spin up local cluster in case len(files) is small
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def start_local_cluster():
cluster = LocalCluster(processes=False)
client = Client(cluster)
return client
# create list of filenames to fetch
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def create_list(start_date, end_date, format="%d-%m-%Y"):
'''
This task generates a list of filenames to fetch from the Github Archive API.
start_date: a string containing the start date
end_date: a string containing the end date
format: datetime format, defaults to "%d-%m-%Y"
'''
start = datetime.datetime.strptime(start_date, format)
end = datetime.datetime.strptime(end_date, format)
date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
prefix = "<https://data.gharchive.org/>"
filenames = []
for date in date_generated:
for hour in range(1,24):
filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
return filenames
# EXTRACT
# get data from Github api
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_github_data(filenames):
'''
Task to fetch JSON data from Github Archive project and filter out PushEvents.
filenames: list of filenames created with create_list() task
'''
records = db.read_text(filenames).map(ujson.loads)
push_events = records.filter(lambda record: record["type"] == "PushEvent")
return push_events
# TRANSFORM
# transform json into dataframe
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_dataframe(push_events):
'''
This task processes the nested json data into a flat, tabular format.
Each row represents a single commit.
push_events: PushEvent data fetched with get_github_data()
'''
def process(record):
try:
for commit in record["payload"]["commits"]:
yield {
"user": record["actor"]["login"],
"repo": record["repo"]["name"],
"created_at": record["created_at"],
"message": commit["message"],
"author": commit["author"]["name"],
}
except KeyError:
pass
processed = push_events.map(process)
df = processed.flatten().to_dataframe()
return df
# LOAD
# write to parquet
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_parquet(df, path):
'''
This task writes the flattened dataframe of PushEvents to the specified path as a parquet file.
path: directory to write parquet file to, can be local or cloud store.
'''
df.to_parquet(
path,
engine='pyarrow',
compression='snappy'
)
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def check_size(filenames):
# there are probably many (better) ways to do this, but let's use a trivial method for now
return len(filenames) < 50
# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
filenames = create_list(start_date="01-01-2015", end_date="02-01-2015")
check_size = check_size(filenames)
# check if len(filenames) < 50
#if True, start localcluster as client
with case(check_size, True):
client1 = start_local_cluster()
#if false, start Coiled cluster as client
with case(check_size, False):
client2 = start_cluster()
client = merge(client1, client2)
push_events = get_github_data(filenames)
df = to_dataframe(push_events)
with case(check_size, True):
to_parquet(df, path="small-prefect-test-conditional.parq")
with case(check_size, False):
to_parquet(df, path="<s3://coiled-datasets/prefect/prefect-test-conditional.parq>")
if __name__ == "__main__":
flow.run(executor=DaskExecutor())
Richard Pelgrim
09/08/2021, 2:47 PMKevin Kho
Kevin Kho
DaskExecutor()
. This error is likely coming from your functions to start a cluster.
You should do this instead:
executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "kvnkho/prefect",
"shutdown_on_close": True,
"name": "prefect-cluster",
},
)
flow.executor = executor
flow.register(...)
Kevin Kho
LocalCluster
inside the DaskExecutor
because the LocalCluster
is a multiprocessing pool.Kevin Kho
len(filenames)
exists.Kevin Kho
Kevin Kho
Kevin Kho
Richard Pelgrim
09/09/2021, 4:19 AMRichard Pelgrim
09/09/2021, 4:20 AM