Daniel Sääf
08/23/2022, 9:00 AMTejal Singh
08/23/2022, 9:51 AMOwen Cook
08/23/2022, 11:02 AMSaman
08/23/2022, 11:32 AMprefect cloud login -k *** -w myaccount/test
will ask for a profile name. Is there a way to pass the profile name in the parameters?Karan
08/23/2022, 12:08 PMMaria
08/23/2022, 12:31 PMTom Klein
08/23/2022, 2:11 PMRobert Esteves
08/23/2022, 2:27 PMIlya Galperin
08/23/2022, 2:40 PMprefect cloud login -k $PREFECT_API_KEY -w $PREFECT_WORKSPACE
in order to run a block.save
Python method? If we try authenticating through simply setting our default profile and running prefect config set PREFECT_API_KEY/PREFECT_API_URL
but not explicitly calling prefect cloud, we get an error when trying to save a storage block (noted in replies). Is there a recommended pattern to use for CI/CD to a save a block otherwise?Nikhil Jain
08/23/2022, 5:22 PMTejal Singh
08/23/2022, 6:33 PMRuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
from prefect import task, flow
import contextlib
from pyspark import SparkConf
from pyspark.sql import SparkSession
@contextlib.contextmanager
def SparkCluster(conf2: SparkConf = SparkConf()):
ssobj = SparkSession.builder.config(conf=conf2).getOrCreate()
try:
yield ssobj
finally:
ssobj.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task()
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
@flow(name="spark_flow")
def my_flow():
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
my_flow()
Basically I was trying to convert the below code from prefect 1.0 to 2.0:
from prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
Ian Andres Etnyre Mercader
08/23/2022, 9:45 PMContextModel
.
I'm running into issues with the asynchronous section of my code when using task_runner=DaskTaskRunner
the context are not defined in the tasks.
@flow(name="Biorxiv", task_runner=DaskTaskRunner())
def biorxiv_main_flow(block_name):
with MyContextModel(block_name):
prepare_destination = biorxiv.prepare_destination()
items = get_docs()
items_futures = []
with tags("converters"):
for item in items:
items_futures.append(biorxiv.convert_xml_to_json.submit(item))
item_results = [i_futures.result() for i_futures in items_futures]
AttributeError: 'NoneType' object has no attribute 'data_fs'
I get this error on the line of code MyContextModel.get().data_fs
get() returns None
Is there something I need to config for the context to be defined in the dask tasks?Prakash Rai
08/24/2022, 1:57 AMflow
with two `task`s
◦ The first task
downloads a CSV
◦ The second task
downloads a PDF for every row in the CSV.
▪︎ It takes around 10s to download a PDF, and there are around 500 PDFs to be downloaded
▪︎ Each task is named after the PDF it is downloading (I'm using with_options
to assign task names on the fly)
▪︎ I've added a concurrency-limit of 8 on this task.
Now, when my flow completes, I still see some 6-7 tasks in Running
state on the UI. However, the corresponding PDFs are downloaded and saved on my disk.
I have three questions
• Why is this happening? The fact that PDFs are downloaded means that the tasks are completed. Is prefect somehow failing to detect that the job ended?
• I'm using prefect concurrency-limit inspect 'pdf-downloader'
to look for the running tasks. I am able to extract task-ids, but can't find a documented way of killing them. Is there a command which takes task ID and kills it? If not, what is the preferred way of killing
• Is there a way to specify maximum time limit for a task?
Thanks in advance 🙂Ahmed Ezzat
08/24/2022, 3:59 AMresult_filesystem
it's responsible for handling task result storage. I'm trying to store task result on s3 bucketMalavika S Menon
08/24/2022, 5:42 AMJosé Duarte
08/24/2022, 7:55 AMvalue: NonNullableType = None
? I understand the motivation behind it but in the end, the type is wrong and mypy will complain. Is there a setting for mypy I am missing?Stephen Lloyd
08/24/2022, 8:46 AMKaran
08/24/2022, 10:24 AMTejal Singh
08/24/2022, 11:34 AMfrom prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
I have converted a few obvious things but still I dont have an idea how to remove this resource_manager decorator and what context manager to use in that place. Now my code looks like this:
from prefect import task, flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
@flow(name="spark_flow")
def my_flow():
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
my_flow()
Can anyone please help me in this. cc @Anna GellerBarys Rutman
08/24/2022, 11:45 AMVishy ganesh
08/24/2022, 1:15 PM--scheduler --no-scheduler
│ --ui --no-ui
UI Server HA:
--scheduler --no-scheduler
Or am I thing through this in the wrong fashionDaniel Sääf
08/24/2022, 1:19 PMTim-Oliver
08/24/2022, 1:53 PMJonathan Pou
08/24/2022, 2:00 PMChris L.
08/24/2022, 2:06 PMPrefectHTTPStatusError
. I found out that a team member recently changed a few of our k8s secrets and replaced our k8s secret PREFECT_API_KEY with an expired key. Changing the api key to an active key solved the issue. Going to open a GitHub issue as I believe the 404 error sent me on the wrong goose chase...
image
is now prefecthq/prefect:2.2.0-python3.9
instead of prefecthq/prefect:2.1.1-python3.9
. Nevertheless, I also tested using image: prefecthq/prefect:2.1.1-python3.9
, also getting the same error (see full traceback in thread).prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/REDACTED/workspaces/REDACTED/block_types/>'
Response: {'detail': 'Not Found'}
For more information check: <https://httpstatuses.com/404>
An exception occurred.
Karan
08/24/2022, 2:17 PMClaire Herdeman
08/24/2022, 2:42 PMAleksander
08/24/2022, 3:19 PMLucas Brum
08/24/2022, 3:41 PMSam Garvis
08/24/2022, 4:46 PMupstream_tasks=[result])
in 2.0?
I used this in 1.0 when using the Dask Executor to force a task to wait on another task, but I believe it was taken away in 2.0Sam Garvis
08/24/2022, 4:46 PMupstream_tasks=[result])
in 2.0?
I used this in 1.0 when using the Dask Executor to force a task to wait on another task, but I believe it was taken away in 2.0Neil Natarajan
08/24/2022, 4:49 PMwait_for