Madushan Pathirana
09/04/2025, 8:33 PMMarvin
09/04/2025, 8:33 PMMarvin
09/04/2025, 8:38 PMspark-submit
via the prefect-shell
integration (closer to production)
I’ll show both with complete examples and a few best practices for your single-EC2 setup.
Install prerequisites
- Make sure your Prefect worker environment has these installed:
pip install prefect pyspark prefect-shell
Pattern A: Create a SparkSession inside a Prefect task
- Best for running Spark in local mode on the same EC2 host as your Prefect worker
- Keep the SparkSession lifecycle inside a task
- Stop the session in a finally block to release resources
- With ~8GB RAM on that host, start conservatively (e.g., 4g for driver), leaving RAM for Prefect and the OS
from prefect import flow, task, get_run_logger
from pyspark.sql import SparkSession
@task(retries=2, retry_delay_seconds=60)
def run_spark_job(input_uri: str, output_uri: str, driver_mem: str = "4g"):
logger = get_run_logger()
spark = (
SparkSession.builder
.appName("prefect-pyspark")
.master("local[*]") # single EC2 host
.config("spark.driver.memory", driver_mem)
.config("spark.sql.shuffle.partitions", "8")
# If using S3, ensure the Hadoop AWS jars are available for your Spark/Hadoop version
# .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider")
# .config("spark.hadoop.fs.s3a.endpoint", "<http://s3.ap-southeast-1.amazonaws.com|s3.ap-southeast-1.amazonaws.com>")
.getOrCreate()
)
try:
<http://logger.info|logger.info>("Reading data...")
df = spark.read.parquet(input_uri)
<http://logger.info|logger.info>(f"Input count: {df.count()}")
<http://logger.info|logger.info>("Transforming data...")
result = df.dropna()
<http://logger.info|logger.info>(f"Writing result to {output_uri}...")
result.write.mode("overwrite").parquet(output_uri)
return result.count()
finally:
<http://logger.info|logger.info>("Stopping SparkSession")
spark.stop()
@flow
def spark_flow(input_uri: str, output_uri: str):
n = run_spark_job(input_uri, output_uri)
print(f"Wrote {n} rows")
if __name__ == "__main__":
# Example URIs:
# input_uri = "<s3a://my-bucket/path/to/input>"
# output_uri = "<s3a://my-bucket/path/to/output>"
spark_flow("file:///tmp/input", "file:///tmp/output")
Pattern B: Submit a Spark job with spark-submit via prefect-shell
- Great when you already have a spark-submit
script and want Prefect’s orchestration, retries, and logging
- prefect-shell
exposes shell_run_command
to run shell commands in a task-like way
Docs: prefect-shell integration
from prefect import flow
from prefect_shell.commands import shell_run_command
@flow
def submit_spark_job(
script_path: str = "jobs/wordcount.py",
input_uri: str = "<s3a://my-bucket/input>",
output_uri: str = "<s3a://my-bucket/output>",
driver_mem: str = "4g",
):
# Adjust --packages to match your Spark/Hadoop distro if you use S3A
cmd = (
"spark-submit "
"--master local[*] "
f"--driver-memory {driver_mem} "
# Example if S3A jars are needed (versions must match your Spark/Hadoop):
# "--packages org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262 "
f"{script_path} "
f"--input {input_uri} "
f"--output {output_uri}"
)
# If using IAM role on the EC2 instance, you typically don't need to pass keys.
# If needed, you can pass env vars to the command:
output = shell_run_command(
command=cmd,
env={
"AWS_REGION": "ap-southeast-1",
# "AWS_ACCESS_KEY_ID": "...",
# "AWS_SECRET_ACCESS_KEY": "...",
}
)
return output
if __name__ == "__main__":
submit_spark_job()
S3 access tips (ap-southeast-1)Marvin
09/04/2025, 8:38 PM--packages
)
- Optionally set the region-specific endpoint if needed:
- fs.s3a.endpoint = <http://s3.ap-southeast-1.amazonaws.com|s3.ap-southeast-1.amazonaws.com>
- With an instance profile, you can set:
- fs.s3a.aws.credentials.provider = com.amazonaws.auth.InstanceProfileCredentialsProvider
Resource sizing for your single 8GB EC2 host
- Leave headroom for the OS and Prefect worker
- Start with spark.driver.memory=4g
- If running local mode with many cores, you may also want to reduce parallelism: spark.sql.shuffle.partitions=8
(or fewer)
- Monitor memory/CPU and adjust
When to use which pattern
- Inline SparkSession (Pattern A): simplest for local testing and small jobs on the same box as the Prefect worker
- spark-submit (Pattern B): closer to production workflows, easier to pass job files and JVM packages, and aligns with how Spark is usually run in clusters
Would you like me to tailor this to your exact setup?
- Are you running Spark purely in local mode on your EC2, or will you connect to a standalone/YARN/EMR/Kubernetes cluster?
- Will your jobs read/write S3 in ap-southeast-1?
- Do you prefer inline SparkSession or spark-submit
style?Madushan Pathirana
09/05/2025, 1:37 AMMarvin
09/05/2025, 1:37 AMMarvin
09/05/2025, 1:45 AMpip install "prefect[aws]"
export PREFECT_API_URL=http://<your-ec2-host-or-ip>:4200/api
export AWS_DEFAULT_REGION=ap-southeast-1
# If the EC2 instance running the worker has an instance profile, you don't need these:
# export AWS_ACCESS_KEY_ID=...
# export AWS_SECRET_ACCESS_KEY=...
2) Create an ECS work pool and base job template
- Generate the default ECS job template, then edit it:
prefect work-pool get-default-base-job-template --type ecs --file ecs-base.json
- In ecs-base.json
, set at minimum:
- cluster: your ECS cluster ARN or name (in ap-southeast-1)
- launchType or capacityProviderStrategy: typically FARGATE
- cpu / memory: e.g., 1024 / 2048
- taskRoleArn and executionRoleArn: ensure the worker IAM principal has iam:PassRole permission for these
- networkConfiguration:
- subnets: your VPC subnet IDs in ap-southeast-1
- securityGroups: sg IDs that allow egress to Prefect Server and image registry (NAT/IGW as needed)
- assignPublicIp: ENABLED if using public subnets
- image: your container image with your code + dependencies (or a lightweight image if you’ll pull code from source)
- logging (optional but recommended): awslogs group/region/stream-prefix
- Create the pool:
prefect work-pool create "dev-ecs" --type ecs --base-job-template ./ecs-base.json
3) Start the ECS worker (submits runs to ECS, does not execute flows locally)
prefect worker start --pool dev-ecs
4) Deploy your flow to the ECS pool
- CLI (from your project root):
prefect deploy ./flows/my_flow.py:my_flow --name my-flow-dev --work-pool-name dev-ecs
- Or source-based, programmatically:
from prefect import flow
f = flow.from_source(
"<https://github.com/<org>/<repo>.git>",
entrypoint="flows/my_flow.py:my_flow",
)
f.deploy(name="my-flow-dev", work_pool_name="dev-ecs")
IAM notes for the worker (the identity running prefect worker start
)
- ecs:RunTask, ecs:StopTask, ecs:DescribeTasks
- iam:PassRole for the taskRoleArn and executionRoleArn used by the tasks
- If the worker registers task definitions dynamically: ecs:RegisterTaskDefinition
- If using CloudWatch logs: logs:CreateLogStream, logs:PutLogEvents
Given your setup:
- You’re in ap-southeast-1 — ensure the cluster, subnets, and logs are all in that region.
- If you truly want to keep all dev/stage/prod on the same EC2 host, ECS may not be necessary. You can instead create separate work pools (process or docker) per env on that host. If you do want ECS for isolation/scaling, create separate ECS work pools (e.g., dev-ecs, stage-ecs, prod-ecs) with distinct job templates.
Want me to generate a starter ecs-base.json
tailored to ap-southeast-1 and your VPC/subnets if you can share the cluster name/ARN, subnet IDs, and security group IDs?