Hi everyone, When a task is failed, what happens ...
# ask-community
r
Hi everyone, When a task is failed, what happens to the downstream task? I was expecting that the downstream tasks would be failed. But it was just staying on the pending state. Is that expected from Prefect Flow?
DeleteCluster's trigger was
all_finished
. Is there any workaround to make this task triggered whatever happened to upstream tasks?
a
It might make sense to make use of a resource manager for the cluster create/delete: https://docs.prefect.io/core/idioms/resource-manager.html
r
Hi @Amanda Wee. So we can say it's expected when task stays in pending state while upstream task is failed?
a
No, I would expect it to enter
TriggerFailed
state by default, which is a
Failed
state, which in turn is a
Finished
state.
k
Hi @Ranu Goldan! @Amanda Wee is right that it should fail. How long is it stuck in pending?
r
It's been >1 hour until now. I don't think it will be changed since the flow run state already failed. What possibilities may caused this?
Sorry, I've found the culprit
Copy code
Unexpected error: TypeError('Could not serialize object of type Failed.\nTraceback (most recent call last):\n  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n    result = pickle.dumps(x, **dump_kwargs)\nTypeError: can\'t pickle _thread.RLock objects\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 307, in serialize\n    header, frames = dumps(x, context=context) if wants_context else dumps(x)\n  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 61, in pickle_dumps\n    protocol=context.get("pickle-protocol", None) if context else None,\n  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n    result = cloudpickle.dumps(x, **dump_kwargs)\n  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps\n    cp.dump(obj)\n  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n    return Pickler.dump(self, obj)\n  File "/usr/local/lib/python3.7/pickle.py", line 437, in dump\n    self.save(obj)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n    save(state)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n    self._batch_setitems(obj.items())\n  File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n    save(v)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n    save(state)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n    self._batch_setitems(obj.items())\n  File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n    save(v)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n    save(state)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n    self._batch_setitems(obj.items())\n  File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n    save(v)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 774, in save_tuple\n    save(element)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 638, in save_reduce\n    save(args)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 774, in save_tuple\n    save(element)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n    save(state)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n    self._batch_setitems(obj.items())\n  File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n    save(v)\n  File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n    self.save_reduce(obj=obj, *rv)\n  File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n    save(state)\n  File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n    f(self, obj) # Call unbound method with explicit self\n  File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n    self._batch_setitems(obj.items())\n  File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n    save(v)\n  File "/usr/local/lib/python3.7/pickle.py", line 524, in save\n    rv = reduce(self.proto)\nTypeError: can\'t pickle _thread.RLock objects\n')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 522, in get_flow_run_state
    {e: state for e, state in upstream_states.items()}
  File "/usr/local/lib/python3.7/site-packages/prefect/executors/dask.py", line 399, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1969, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 838, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1857, in _gather
    response = await future
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1908, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation
    operation=operation,
  File "/usr/local/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry
    return await coro()
  File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 644, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 221, in read
    allow_offload=self.allow_offload,
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/core.py", line 130, in loads
    value = merge_and_deserialize(head, fs, deserializers=deserializers)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 450, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 384, in deserialize
    return loads(header, frames)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 164, in serialization_error_loads
    raise TypeError(msg)
TypeError: Could not serialize object of type Failed.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: can't pickle _thread.RLock objects
But I'm still confused, what is
Failed
object? Why it's failed to be pickled?
k
What are you returning in the tasks?
r
Uh, I return nothing, just as void func apparently. 🥴 Is that why?
But it's fine when it success
k
Both
Create Cluster
and
Submit Job
are returning nothing?
r
Yes you're right
k
Can I see their definitions? Remove any credentials if you have any
r
There you go:
Copy code
@task(name="Create Cluster", trigger=all_successful, log_stdout=True)
def create_cluster(service_account_info, cluster_configs):

    project_id = cluster_configs["project_id"]
    cluster_name = cluster_configs["cluster_name"]
    config = cluster_configs["cluster_spec"]

    print("Creating cluster...")

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        credentials = service_account.Credentials.from_service_account_info(service_account_info),
        client_options={"api_endpoint": "***"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": config
    }
    
    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": "asia-", "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print("Cluster created successfully: {}".format(result.cluster_name))


@task(name="Submit Job", trigger=all_successful, log_stdout=True)
def submit_job(service_account_info, cluster_configs, table_config, db_credentials, gcp_credentials):

    if db_credentials is None or gcp_credentials is None:
        raise Exception("no credential provided")

    print(db_credentials, gcp_credentials, table_config)

    args = {}

    if db_credentials["database_type"] == "MONGODB":
        print("Mongo")
        jars_config = ["<gs://spark-lib/bigquery/**>"]
        pyspark_file = "gs://{}/.py".format(gcp_credentials["bucket_name"])
        args["connection_string"] = db_credentials["connection_string"]

    elif "port" in db_credentials:
        args["hostname"] = db_credentials["hostname"]
        args["port"] = db_credentials["port"]
        args["user"] = db_credentials["user"]
        args["password"] = db_credentials["password"]
        args["database_type"] = db_credentials["database_type"]

        if db_credentials["database_type"] == "MYSQL":
            print("MySQL")
            jars_config = ["<gs://spark-lib>"]
            pyspark_file = "gs://{}/.py".format(gcp_credentials["bucket_name"])

        elif db_credentials["database_type"] == "POSTGRESQL":
            print("Postgre SQL")
            jars_config = ["gs://***"]
            pyspark_file = "gs://{}/**".format(gcp_credentials["bucket_name"])

    print(table_config, type(table_config))
k
I can’t tell anything wrong immediately. I don’t know why you’d get an error like the one you showed. The
Failed
class is the state of the task. I suggest you try the Resource Manager as @Amanda Wee suggested, and it will give you better handling for these resources.
The RLock message normally happens when you’re passing around things that can’t be pickled to Dask workers (clients, connections)
I feel like you may need to close the
Client
in some away, but I’m not sure.
r
Hi @Kevin Kho my team resolved this by catching the exception and raise
FAIL()
. Thanks for your help!
👍 1