Ranu Goldan
05/03/2021, 12:52 PMRanu Goldan
05/03/2021, 12:53 PMRanu Goldan
05/03/2021, 12:56 PMall_finished
. Is there any workaround to make this task triggered whatever happened to upstream tasks?Amanda Wee
05/03/2021, 1:00 PMRanu Goldan
05/03/2021, 1:32 PMAmanda Wee
05/03/2021, 1:42 PMTriggerFailed
state by default, which is a Failed
state, which in turn is a Finished
state.Kevin Kho
Ranu Goldan
05/03/2021, 1:48 PMRanu Goldan
05/03/2021, 1:53 PMUnexpected error: TypeError('Could not serialize object of type Failed.\nTraceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)\nTypeError: can\'t pickle _thread.RLock objects\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 307, in serialize\n header, frames = dumps(x, context=context) if wants_context else dumps(x)\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 61, in pickle_dumps\n protocol=context.get("pickle-protocol", None) if context else None,\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n result = cloudpickle.dumps(x, **dump_kwargs)\n File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps\n cp.dump(obj)\n File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n return Pickler.dump(self, obj)\n File "/usr/local/lib/python3.7/pickle.py", line 437, in dump\n self.save(obj)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 774, in save_tuple\n save(element)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 638, in save_reduce\n save(args)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 774, in save_tuple\n save(element)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 524, in save\n rv = reduce(self.proto)\nTypeError: can\'t pickle _thread.RLock objects\n')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 522, in get_flow_run_state
{e: state for e, state in upstream_states.items()}
File "/usr/local/lib/python3.7/site-packages/prefect/executors/dask.py", line 399, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1969, in gather
asynchronous=asynchronous,
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 838, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 351, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 334, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1857, in _gather
response = await future
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1908, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/usr/local/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation
operation=operation,
File "/usr/local/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry
return await coro()
File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 644, in send_recv
response = await comm.read(deserializers=deserializers)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 221, in read
allow_offload=self.allow_offload,
File "/usr/local/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
res = _from_frames()
File "/usr/local/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/usr/local/lib/python3.7/site-packages/distributed/protocol/core.py", line 130, in loads
value = merge_and_deserialize(head, fs, deserializers=deserializers)
File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 450, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 384, in deserialize
return loads(header, frames)
File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 164, in serialization_error_loads
raise TypeError(msg)
TypeError: Could not serialize object of type Failed.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
TypeError: can't pickle _thread.RLock objects
Ranu Goldan
05/03/2021, 1:54 PMFailed
object? Why it's failed to be pickled?Kevin Kho
Ranu Goldan
05/03/2021, 1:56 PMRanu Goldan
05/03/2021, 1:57 PMKevin Kho
Create Cluster
and Submit Job
are returning nothing?Ranu Goldan
05/03/2021, 2:09 PMKevin Kho
Ranu Goldan
05/03/2021, 2:16 PM@task(name="Create Cluster", trigger=all_successful, log_stdout=True)
def create_cluster(service_account_info, cluster_configs):
project_id = cluster_configs["project_id"]
cluster_name = cluster_configs["cluster_name"]
config = cluster_configs["cluster_spec"]
print("Creating cluster...")
# Create a client with the endpoint set to the desired cluster region.
cluster_client = dataproc.ClusterControllerClient(
credentials = service_account.Credentials.from_service_account_info(service_account_info),
client_options={"api_endpoint": "***"}
)
# Create the cluster config.
cluster = {
"project_id": project_id,
"cluster_name": cluster_name,
"config": config
}
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": project_id, "region": "asia-", "cluster": cluster}
)
result = operation.result()
# Output a success message.
print("Cluster created successfully: {}".format(result.cluster_name))
@task(name="Submit Job", trigger=all_successful, log_stdout=True)
def submit_job(service_account_info, cluster_configs, table_config, db_credentials, gcp_credentials):
if db_credentials is None or gcp_credentials is None:
raise Exception("no credential provided")
print(db_credentials, gcp_credentials, table_config)
args = {}
if db_credentials["database_type"] == "MONGODB":
print("Mongo")
jars_config = ["<gs://spark-lib/bigquery/**>"]
pyspark_file = "gs://{}/.py".format(gcp_credentials["bucket_name"])
args["connection_string"] = db_credentials["connection_string"]
elif "port" in db_credentials:
args["hostname"] = db_credentials["hostname"]
args["port"] = db_credentials["port"]
args["user"] = db_credentials["user"]
args["password"] = db_credentials["password"]
args["database_type"] = db_credentials["database_type"]
if db_credentials["database_type"] == "MYSQL":
print("MySQL")
jars_config = ["<gs://spark-lib>"]
pyspark_file = "gs://{}/.py".format(gcp_credentials["bucket_name"])
elif db_credentials["database_type"] == "POSTGRESQL":
print("Postgre SQL")
jars_config = ["gs://***"]
pyspark_file = "gs://{}/**".format(gcp_credentials["bucket_name"])
print(table_config, type(table_config))
Kevin Kho
Failed
class is the state of the task. I suggest you try the Resource Manager as @Amanda Wee suggested, and it will give you better handling for these resources.Kevin Kho
Kevin Kho
Client
in some away, but I’m not sure.Ranu Goldan
05/06/2021, 6:50 AMFAIL()
. Thanks for your help!