Tejal Singh
08/23/2022, 6:33 PMRuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
from prefect import task, flow
import contextlib
from pyspark import SparkConf
from pyspark.sql import SparkSession
@contextlib.contextmanager
def SparkCluster(conf2: SparkConf = SparkConf()):
ssobj = SparkSession.builder.config(conf=conf2).getOrCreate()
try:
yield ssobj
finally:
ssobj.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task()
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
@flow(name="spark_flow")
def my_flow():
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
my_flow()
Basically I was trying to convert the below code from prefect 1.0 to 2.0:
from prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
Ian Andres Etnyre Mercader
08/23/2022, 9:45 PMContextModel
.
I'm running into issues with the asynchronous section of my code when using task_runner=DaskTaskRunner
the context are not defined in the tasks.
@flow(name="Biorxiv", task_runner=DaskTaskRunner())
def biorxiv_main_flow(block_name):
with MyContextModel(block_name):
prepare_destination = biorxiv.prepare_destination()
items = get_docs()
items_futures = []
with tags("converters"):
for item in items:
items_futures.append(biorxiv.convert_xml_to_json.submit(item))
item_results = [i_futures.result() for i_futures in items_futures]
AttributeError: 'NoneType' object has no attribute 'data_fs'
I get this error on the line of code MyContextModel.get().data_fs
get() returns None
Is there something I need to config for the context to be defined in the dask tasks?Prakash Rai
08/24/2022, 1:57 AMflow
with two `task`s
◦ The first task
downloads a CSV
◦ The second task
downloads a PDF for every row in the CSV.
▪︎ It takes around 10s to download a PDF, and there are around 500 PDFs to be downloaded
▪︎ Each task is named after the PDF it is downloading (I'm using with_options
to assign task names on the fly)
▪︎ I've added a concurrency-limit of 8 on this task.
Now, when my flow completes, I still see some 6-7 tasks in Running
state on the UI. However, the corresponding PDFs are downloaded and saved on my disk.
I have three questions
• Why is this happening? The fact that PDFs are downloaded means that the tasks are completed. Is prefect somehow failing to detect that the job ended?
• I'm using prefect concurrency-limit inspect 'pdf-downloader'
to look for the running tasks. I am able to extract task-ids, but can't find a documented way of killing them. Is there a command which takes task ID and kills it? If not, what is the preferred way of killing
• Is there a way to specify maximum time limit for a task?
Thanks in advance 🙂Ahmed Ezzat
08/24/2022, 3:59 AMresult_filesystem
it's responsible for handling task result storage. I'm trying to store task result on s3 bucketMalavika S Menon
08/24/2022, 5:42 AMJosé Duarte
08/24/2022, 7:55 AMvalue: NonNullableType = None
? I understand the motivation behind it but in the end, the type is wrong and mypy will complain. Is there a setting for mypy I am missing?Stephen Lloyd
08/24/2022, 8:46 AMKaran
08/24/2022, 10:24 AMTejal Singh
08/24/2022, 11:34 AMfrom prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
I have converted a few obvious things but still I dont have an idea how to remove this resource_manager decorator and what context manager to use in that place. Now my code looks like this:
from prefect import task, flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
@flow(name="spark_flow")
def my_flow():
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
my_flow()
Can anyone please help me in this. cc @Anna GellerBarys Rutman
08/24/2022, 11:45 AMVishy ganesh
08/24/2022, 1:15 PM--scheduler --no-scheduler
│ --ui --no-ui
UI Server HA:
--scheduler --no-scheduler
Or am I thing through this in the wrong fashionDaniel Sääf
08/24/2022, 1:19 PMTim-Oliver
08/24/2022, 1:53 PMJonathan Pou
08/24/2022, 2:00 PMChris L.
08/24/2022, 2:06 PMPrefectHTTPStatusError
. I found out that a team member recently changed a few of our k8s secrets and replaced our k8s secret PREFECT_API_KEY with an expired key. Changing the api key to an active key solved the issue. Going to open a GitHub issue as I believe the 404 error sent me on the wrong goose chase...
image
is now prefecthq/prefect:2.2.0-python3.9
instead of prefecthq/prefect:2.1.1-python3.9
. Nevertheless, I also tested using image: prefecthq/prefect:2.1.1-python3.9
, also getting the same error (see full traceback in thread).prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/REDACTED/workspaces/REDACTED/block_types/>'
Response: {'detail': 'Not Found'}
For more information check: <https://httpstatuses.com/404>
An exception occurred.
Karan
08/24/2022, 2:17 PMClaire Herdeman
08/24/2022, 2:42 PMAleksander
08/24/2022, 3:19 PMLucas Brum
08/24/2022, 3:41 PMSam Garvis
08/24/2022, 4:46 PMupstream_tasks=[result])
in 2.0?
I used this in 1.0 when using the Dask Executor to force a task to wait on another task, but I believe it was taken away in 2.0Neil Natarajan
08/24/2022, 4:49 PMDylan
08/24/2022, 5:35 PMDylan
08/24/2022, 5:36 PMDylan
08/24/2022, 5:37 PMKrishnan Chandra
08/24/2022, 6:12 PMprefect deployment build
command, but this hasn’t worked:
prefect deployment build \
...
--override path=/app/
Is there a way to override the path setting, or is this something I’d need to modify in the YAML files after generation?Blake Hamm
08/24/2022, 6:12 PMTomás Emilio Silva Ebensperger
08/24/2022, 6:36 PMagent = LocalAgent()
agent.labels = ['custom_label'}
instead of passing the labels to the LocalAgent params upon instantiation. This way i have complete separation from agents even if they are running locally on the same machine
Now the issue with registering flows is the same, the labels assigned to the flow are the ones you provide + that default local label. I can't seem to remove the default label before registration and
the agents don't work, because their label must be a `superset`of the flow's labels.Tuoyi Zhao
08/24/2022, 8:00 PMMarc Lipoff
08/24/2022, 8:26 PMKeith Hickey
08/24/2022, 9:12 PM