Varun Joshi
02/23/2021, 12:14 PMMatheus Calvelli
02/23/2021, 12:46 PMTobias Heintz
02/23/2021, 12:50 PMBraun Reyes
02/23/2021, 1:32 PMfrom time import sleep
from prefect import Flow, apply_map, task
from prefect.executors import DaskExecutor
@task()
def test_1(x):
sleep(2)
print(f"test_1 with {x}")
return x
@task()
def test_2(x):
sleep(2)
print(f"test_2 with {x}")
def micro_flow(x):
test_1_task = test_1(x)
test_2(test_1_task)
with Flow(
"example",
executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 2}),
) as flow:
apply_map(micro_flow, range(10))
if __name__ == "__main__":
flow.run()
# flow.visualize()
rafaqat ali
02/23/2021, 1:48 PMDaniel Black
02/23/2021, 3:19 PMPedro Machado
02/23/2021, 3:55 PMKubernetesPodOperator
in Airflow and one thing I noticed is that it that you can't see the logs until the task ends. How does logging work in Prefect when you use kubernetes? Are logs streamed to prefect cloud in real time?Asif Imran
02/23/2021, 5:19 PMSensors
from airflow. I see this helpful link[1] from Jeremiah -- essentially do the poke-sleep-poke yourself iiuc. Any changes since his post? Recently AF introduced the notion of SmartSensors[2] which removes a fair bit of work duplication (its fairly typical for me to have several workflows all polling on the same S3 success file). I have similar worries that such polling will hog up resources (e.g going over the resources in my ecs cluster)
[1] https://prefect-community.slack.com/archives/CL09KU1K7/p1602088074097600?thread_ts=1602087747.097500&cid=CL09KU1K7
[2] https://airflow.apache.org/docs/apache-airflow/stable/smart-sensor.htmlAjith Kumara Beragala Acharige Lal
02/23/2021, 5:50 PMGITLab repo
in my prefect-server, can someone help me to figure-out what is the mistake in my code? the error Failed to load and execute Flow's environment: GitlabGetError('404 Project Not Found')
Diego Alonso Roque Montoya
02/23/2021, 6:20 PMS K
02/23/2021, 7:03 PMNeed help here. Trying to do in python as below. This is to check the flow state and cancel in the flow is in running state. How to pass the values to "$flowRunId: UUID!" import prefect
from prefect import Client
from prefect import task, Flow
@task()
def check_runs():
c = Client()
query = """
query RunningFlowsName {
flow(where: {name: {_eq: "flowstatechecktest"}}) {
id
} } """
print('======')
print(c.graphql(query=query))
query2 = """
query RunningFlowsState {
flow_run(where: {state: {_eq: "Running"}}) {
state
} } """
print('======')
print(c.graphql(query=query2))
query3 = """
mutation CancelFlowRun($flowRunId: UUID!) {
cancel_flow_run(input: {flow_run_id: $flowRunId}) {
state
} } """
c.graphql(query=query3)
with Flow("flowstatechecktest") as flow:
check_runs()
flow.run()
matta
02/23/2021, 7:04 PM@task(max_retries=3, retry_delay=timedelta(minutes=30))
but apparently Zombie Killer doesn't like that? Looking through the logs, I see No heartbeat detected from the remote task; marking the run as failed.
, then
`Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">``
then
Heartbeat process died with exit code -9
then
Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.'}}}],)
matta
02/23/2021, 7:04 PMAlex Papanicolaou
02/23/2021, 8:15 PMuser
02/24/2021, 12:17 AMDanny Vilela
02/24/2021, 12:57 AMHdfsDataFrameResult
for checkpointing PySpark DataFrames. Imagine a flow with just two tasks A -> B, where A produces a PySpark DataFrame and B uses that DataFrame. I’d like to make use of caching in case B fails, so that it can quickly a check-pointed A from disk (here, HDFS).
To make use of caching, am I correct that I’d need to:
1. Set environment variable PREFECT__FLOWS__CHECKPOINTING=true
.
2. Implement an `HdfsDataFrameResult`` (with interface read
, write
, exists
).
3. Have Task A’s run
explicitly return a PySpark DataFrame
.
4. Initialize Task A within the Flow
context manager as TaskA(checkpoint=True, result=HdfsDataFrameResult(...))
.
Is that it? I guess I’m not 100% understanding the separation between the serializer and the result, and whether I need a HdfsDataFrameSerializer
. It seems like the serializer is too low-level for PySpark DataFrames, but I’m happy to be proven wrong 🙂Alfie
02/24/2021, 6:29 AMMaria
02/24/2021, 6:49 AMfrom prefect.tasks.azure.blobstorage import (
BlobStorageDownload
)
storage = BlobStorageDownload(azure_credentials_secret="AZURE_DEMO",
container="mycontainer")
storage.run(blob_name="demo")
I'm getting
ValueError: Local Secret "AZURE_DEMO" was not found.
I tried setting AZURE_DEMO in config.toml but getting same result. What am I doing wrong?vish
02/24/2021, 10:13 AMResults
objects meant to be instantiated within a task? Looks like it works either wayCarl
02/24/2021, 12:20 PMPostgresExecuteMany
task but keep running into a “ValueError(’Could not infer an active Flow context.’)” error. Any ideas what I’m doing wrong here? Thx
from prefect import task, Flow
import prefect.tasks.postgres.postgres as pg
DB_NAME = 'blah'
DB_USER = 'blah'
DB_HOST = 'localhost'
sql_insert = pg.PostgresExecuteMany(db_name=DB_NAME, user=DB_USER, host=DB_HOST)
@task()
def extract_data():
vals = ['aaa', 'bbb']
return vals
@task()
def load(data):
insert_stmt = """INSERT INTO "table_a" ("COL_A", "COL_B") VALUES (%s, %s)"""
ret = sql_insert(query=insert_stmt, data=data, commit=True)
return ret
def build_flow():
with Flow('Test ETL') as f:
data = extract_data()
load(data)
return f
flow = build_flow()
flow.run()
Ajith Kumara Beragala Acharige Lal
02/24/2021, 3:14 PMS K
02/24/2021, 7:21 PMRichard Hughes
02/24/2021, 8:59 PMRichard Hughes
02/24/2021, 9:05 PMDanny Vilela
02/24/2021, 9:15 PM[2021-02-24 12:14:55-0800] WARNING - prefect.TaskRunner | Task 'MyTask': Can't use cache because it is now invalid
.
I am using a custom Result
subclass I wrote for caching PySpark DataFrames, which could be the issue. Here’s how I initialize and run the task:
my_task: MyTask = MyTask(
checkpoint=True,
cache_for=dt.timedelta(hours=12),
result=HdfsDataFrameResult(spark=spark, location=hdfs_path_for_this_task),
)
By my understanding, this should cache the output of my_task(…)
for 12 hours. So even if I restart the Python process (say, if I’m developing a flow within a notebook) I can restart the kernel as much as I’d like and still have that task access the cache….right? Am I missing something? Do I need a cache_key
here to share the same cache (here, HDFS) between different flows?Belal Aboabdo
02/24/2021, 10:24 PMS3Results
but am getting this error when mapping the function.
[2021-02-24 14:11:49-0800] ERROR - prefect.TaskRunner | Task 'plot_map_counts_data[0]': Unexpected error while running task: TypeError("exists() got multiple values for argument 'location'",)
The flow runs successfully without checkpointing, here's an example of the task.
@task(
checkpoint=True,
target="{flow_name}/{today}/{task_name}_{map_index}.png",
result=prefect.engine.results.S3Result(bucket=results_s3_bucket),
)
def plot_map_counts_data(df, location, resource):
#some task
return image
with Flow("example") as flow:
plot_map_counts_data.map(
df,
locations,
unmapped("test_resource"),
)
Robert Bastian
02/24/2021, 11:11 PM@task
def get_url(run_id, hook):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("RUN ID: %s", run_id)
url = hook.get_run_page_url(run_id=run_id)
return url
SubmitRun = DatabricksSubmitRun()
with Flow("test_databricks", storage=STORAGE, run_config=RUN_CONFIG) as flow:
conn = PrefectSecret('DATABRICKS_CONNECTION_STRING')
json = get_job_config()
run_id = SubmitRun(json=json, databricks_conn_secret=conn)
hook = SubmitRun.get_hook()
url = get_url(run_id, hook)
Here is the exception:
ERROR:prefect.TaskRunner:Unexpected error: TypeError("argument of type 'NoneType' is not iterable")
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/rbastian/enverus/RAI/prefect/flows/test-databricks.py", line 32, in get_url
url = hook.get_run_page_url(run_id=run_id)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 248, in get_run_page_url
response = self._do_api_call(GET_RUN_ENDPOINT, json)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 148, in _do_api_call
if "token" in self.databricks_conn:
Thanks in advance!Dhiraj Golhar
02/25/2021, 6:05 AMJoël Luijmes
02/25/2021, 8:33 AM{
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Operation cannot be fulfilled on resourcequotas \"gke-resource-quotas\": the object has been modified; please apply your changes to the latest version and try again",
"reason": "Conflict",
"details": {
"name": "gke-resource-quotas",
"kind": "resourcequotas"
},
"code": 409
}
Googling didn’t yield much result but it seems like an internal kubernetes issue.
So to fix this, I think there are two ways:
1. Use prefect retry mechanism
2. Modify prefect tasks to retry on this error (willing to contribute myself)
3. Modify my code to retry
My question: What would be the best approach here?
With 1) it still might fail because on retry, the same burst exists when creating kubernetes objects (or can I perform random delay?) + in a resource manager I may create multiple resources -> retry does not exist (AFAIK), and if it does how would I track which resources exist.
With 2) don’t know if this is right methodology, can imagine retrying in task lilbs is anti-pattern
With 3) no downsides except I have to impleement this ev3rywereMichael Hadorn
02/25/2021, 9:40 AM