Hey there, maybe you can help me out here with som...
# prefect-community
r
Hey there, maybe you can help me out here with some Prefect code and the rendered schematics. Context: I’m developing a flow that creates and submits simple spark apps to EMR. To my surprise there are no built-in tasks for EMR (docs). Nonetheless, I am trying to use
boto3
and
awswrangler
APIs (as I’ve seen others users asking for it on the community channel). My current issue, is on the lack of understanding on how the Flows are rendered on the Schematic view. It’s related to tasks dependencies and inter-task communication (passing values).
airflow-example_emr_jobs-success_error.py
The first image shows the Airflow DAG I am trying to reproduce. That DAG uses jinja templating to pass values.
To be honest, Airflow does this too if we are using the task outputs (
task.output
). I i use such a feature, the DAG shows like this:
That said, this is what I am getting with Prefect:
Example_emr_jobs-success_error-v2.py
My question: is there any way to improve the flow schematic? (I’m using Prefect Cloud) Do you know it is improved on ORION (i’ve seen radar UI 😉)
a
When you say improve the flow schematic, do you mean some sort of drill-down functionality?
k
So I am not sure this can be improved because you are using that
cluster_id
, which is a task, in downstream tasks so it gets connected. I think the only way this schematic can be improved is if
cluster_id
is hardcoded, which loses a lot of flexibility. I believe this can be done in Orion because you can materialize tasks as Python objects by calling
.wait().result()
so you can materialize the
cluster_id
task and then pass it downstream as a normal Python object
❤️ 1
a
Another thing that could improve your flow in general would be to leverage a resource manager for the boto3 client that you are invoking within the flow. Instead of passing this HTTP client between tasks, you could pass it in a way that would be safer and would provide you even more visibility into it within your flow, This blog post has an example
upvote 1
@Ricardo Gaspar I wanted to help more and I included a resource manager for the EMR cluster creation because resource manager is such a good fit for that use case and it gives you way better visualization in the UI. Here is the Gist: https://gist.github.com/3e63d44e759c3d348ae0698355c91653 and the flow diagram:
btw curious to hear why did you choose to run the cluster 24/7? With Awswrangler, you could also create the cluster on demand and delete once your job finished
r
thank you both @Kevin Kho and @Anna Geller! 1. that
.wait().result()
would probably very useful! Another thing I’ve noted was that if I’m grabbing a value from a task result, let’s say my task it’s returning an array and I want to use it as arguments to multiple tasks ( e.g.
task2(arg1=task1['result1']
) , the graph renders new boxes/nodes for that intermediate results. 2. Didn’t know about that
resourse manager
feature! I’ll look into it, it seems very appropiate for my use case. Thanks for sharing. 3. I don’t wanna use a 24/7 cluster, I delegate the creation and destruction to the orchestration tool: a create cluster, run steps, terminate approach. I don’t want to submit all the steps at the creation time to have control of the submission, data dependencies between my spark apps and not rely on EMR to do so. I’ll have a timeout for EMR to kill itself as part of cluster configs (EMR > 6.0.0). 4. I’m not using awsdatawrangler yet because the method to create cluster uses instance fleets and not instance groups. For this example I’m using instance groups. I’ll change it in the future
👍 1
a question for you @Anna Geller about that resource manager: if I needed to create multiple clusters and submit steps , how would I do that given that
with… resource
block syntax? or is there a imperative one as well (like a singleton object approach)? My first thoughts are: 1. having multiple flows (each with it’s own resource manager) and invoking them on a main flow
k
If you invoke a Flow from the main Flow, you need to define the resource manager or executor at the subflow level. Man have you seen Orion though? Cuz Orion lets you define this from the main flow level. Is your thinking just very in line with Orion? 😆
😍 1
P 1
a
I would approach it exactly the way you described with a separate flow run for each such use case. It's actually a very common and even recommended by AWS approach to have ephemeral EMR clusters per run/job.
r
@Anna Geller creating a cluster takes a lot of time, so reusing makes a lot of sense and it’s a common practise. One can do it in different ways; but even Databricks recenly added this ability: https://databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html
thanks a lot for your help 🙏 really appreciate it; you rock! P
🙏 1
@Anna Geller could one add more methods to the resource method and call them from the outside? I’d like to include some static methods as tasks to add steps and wait for steps. like:
Copy code
@staticmethod
    def wait_emr_cluster_is_running(cluster_id: str, poll_interval_secs: int = 60):
        waiter = emr_client.get_waiter('cluster_running')
        waiter.wait(
            ClusterId=cluster_id,
            WaiterConfig={
                'Delay': poll_interval_secs,
                'MaxAttempts': 60
            }
        )
but it seems not to me possible
Copy code
line 178, in <module>
    wait_cluster = EMRCluster.wait_emr_cluster_is_running(emr_client=emr_client, cluster_id=cluster_id,
AttributeError: 'ResourceManager' object has no attribute 'wait_emr_cluster_is_running'
I guess I have to wrap the resource in another class together with my desired methods
k
I personally haven’t seen it done. You are doing something like this right?
Copy code
with Flow(...) as flow:
    with MyResource(...) as resource:
        resource.wait_emr_cluster_is_running()...
        some_task(resource)
        other_task(resource)
I don’t think it can be done because the Flow block should only contain tasks. MyResource there is a task so execution is deferred to the flow execution but using a method that is not a task will not defer
r
exactly! Maybe if I wrap the resource in another class; something like:
Copy code
class myEMRUtils:
   
   @staticmethod
   def wait_emr_cluster_is_running(...):
     return result

   @resourcemanager
    class EMRCluster:
        def __init__(self,..)
        def setup(self, ...)
        def cleanup(self,...)
k
I dont think this works well, but not 100% sure. I would suggest calling
wait_emr_cluster_is_running
inside startup instead
r
that’s also a possible solution
but I’m not concerned with that method particularly; but rather the others
add_step
,
wait_step
. It would be great to be able to use the same boto3 session, and cluster ID. If not I’ll just create a separate class
k
Ah I see. You need to get the
add_step
and
wait_step
to function as tasks
💯 1