Ricardo Gaspar
02/07/2022, 2:04 PMboto3
and awswrangler
APIs (as I’ve seen others users asking for it on the community channel).
My current issue, is on the lack of understanding on how the Flows are rendered on the Schematic view. It’s related to tasks dependencies and inter-task communication (passing values).task.output
). I i use such a feature, the DAG shows like this:Anna Geller
Kevin Kho
cluster_id
, which is a task, in downstream tasks so it gets connected. I think the only way this schematic can be improved is if cluster_id
is hardcoded, which loses a lot of flexibility.
I believe this can be done in Orion because you can materialize tasks as Python objects by calling .wait().result()
so you can materialize the cluster_id
task and then pass it downstream as a normal Python objectAnna Geller
Ricardo Gaspar
02/07/2022, 4:02 PM.wait().result()
would probably very useful! Another thing I’ve noted was that if I’m grabbing a value from a task result, let’s say my task it’s returning an array and I want to use it as arguments to multiple tasks ( e.g. task2(arg1=task1['result1']
) , the graph renders new boxes/nodes for that intermediate results.
2. Didn’t know about that resourse manager
feature! I’ll look into it, it seems very appropiate for my use case. Thanks for sharing.
3. I don’t wanna use a 24/7 cluster, I delegate the creation and destruction to the orchestration tool: a create cluster, run steps, terminate approach. I don’t want to submit all the steps at the creation time to have control of the submission, data dependencies between my spark apps and not rely on EMR to do so. I’ll have a timeout for EMR to kill itself as part of cluster configs (EMR > 6.0.0).
4. I’m not using awsdatawrangler yet because the method to create cluster uses instance fleets and not instance groups. For this example I’m using instance groups. I’ll change it in the futurewith… resource
block syntax? or is there a imperative one as well (like a singleton object approach)?
My first thoughts are:
1. having multiple flows (each with it’s own resource manager) and invoking them on a main flowKevin Kho
Anna Geller
Ricardo Gaspar
02/07/2022, 5:12 PM@staticmethod
def wait_emr_cluster_is_running(cluster_id: str, poll_interval_secs: int = 60):
waiter = emr_client.get_waiter('cluster_running')
waiter.wait(
ClusterId=cluster_id,
WaiterConfig={
'Delay': poll_interval_secs,
'MaxAttempts': 60
}
)
but it seems not to me possible
line 178, in <module>
wait_cluster = EMRCluster.wait_emr_cluster_is_running(emr_client=emr_client, cluster_id=cluster_id,
AttributeError: 'ResourceManager' object has no attribute 'wait_emr_cluster_is_running'
I guess I have to wrap the resource in another class together with my desired methodsKevin Kho
with Flow(...) as flow:
with MyResource(...) as resource:
resource.wait_emr_cluster_is_running()...
some_task(resource)
other_task(resource)
I don’t think it can be done because the Flow block should only contain tasks. MyResource there is a task so execution is deferred to the flow execution but using a method that is not a task will not deferRicardo Gaspar
02/08/2022, 3:20 PMclass myEMRUtils:
@staticmethod
def wait_emr_cluster_is_running(...):
return result
@resourcemanager
class EMRCluster:
def __init__(self,..)
def setup(self, ...)
def cleanup(self,...)
Kevin Kho
wait_emr_cluster_is_running
inside startup insteadRicardo Gaspar
02/08/2022, 3:21 PMadd_step
, wait_step
.
It would be great to be able to use the same boto3 session, and cluster ID.
If not I’ll just create a separate classKevin Kho
add_step
and wait_step
to function as tasks