Seth Coussens
10/10/2022, 9:47 PMDeepanshu Aggarwal
10/10/2022, 10:29 PMHomesh Wathsalya
10/10/2022, 11:28 PMThar Htet San
10/11/2022, 4:49 AMZac Hooper
10/11/2022, 5:11 AMdescription = """
This is an example description
\n\n\n
Schedule: Every 3 minutes
"""
How it looks in the UI:Vadym Dytyniak
10/11/2022, 7:49 AMAndreas Nigg
10/11/2022, 8:45 AMmax
10/11/2022, 9:27 AMHieu Tran
10/11/2022, 9:53 AMimport asyncio
from prefect.client import get_client
from prefect.orion.schemas.states import StateType
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
async def main():
client = get_client()
state_type_filter = FlowRunFilterStateType(any_=[StateType.RUNNING])
state_filter = FlowRunFilterState(state_type_filter)
flow_filter = FlowRunFilter(state_filter)
flow_runs = await client.read_flow_runs(flow_filter)
print(flow_runs)
if __name__ == '__main__':
asyncio.run(main())
Traceback (most recent call last):
File "get_client_context.py", line 21, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "get_client_context.py", line 9, in main
state_filter = FlowRunFilterState(state_type_filter)
File "pydantic/main.py", line 333, in pydantic.main.BaseModel.__init__
TypeError: __init__() takes exactly 1 positional argument (2 given)
Stephen Lloyd
10/11/2022, 11:02 AMAdam Eury
10/11/2022, 1:45 PMDaniel Tashman
10/11/2022, 2:23 PMJohn Mizerany
10/11/2022, 2:30 PMKelvin DeCosta
10/11/2022, 3:54 PMprefect
cli help message over and over again.
ECS console shows a list of STOPPED
tasks, which I'm assuming are various attempts to start the service and keep it running.Nathaniel Russell
10/11/2022, 4:18 PM/usr/local/lib/python3.9/site-packages/prefect/logging/handlers.py:76: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
It then runs the flow code correctly, but after the flow is done it crashes, and gives this error:
[in thread]
All of my flows perform their intended code but all end with this error and say crashed. How do I fix this?Jason Bertman
10/11/2022, 4:40 PMUnhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1191, in orchestrate_task_run
state = await propose_state(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1496, in propose_state
raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run cannot transition to the RUNNING state from the RUNNING state.
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1121, in begin_task_run
task_run.state.data._cache_data(await _retrieve_result(task_run.state))
AttributeError: 'NoneType' object has no attribute '_cache_data'
It seems like the engine is mistaking a task run for not running yet?Walter Cavinaw
10/11/2022, 5:14 PMImran Qureshi
10/11/2022, 7:26 PMCarlo
10/11/2022, 8:28 PMDavid Cupp
10/11/2022, 9:44 PMRRuleSchedule
only takes a "rrule string". It seems easy to convert a single rrule to a string, but as far as I can tell there is no standard implementation to convert an rrule set into an rrule string [3]. Any ideas?
[1] https://docs.prefect.io/api-ref/orion/schemas/schedules/#prefect.orion.schemas.schedules.RRuleSchedule
[2] https://github.com/PrefectHQ/prefect/blob/main/src/prefect/orion/schemas/schedules.py#L322
[3] https://github.com/dateutil/dateutil/issues/856Nace Plesko
10/11/2022, 10:47 PMcommand
for ShellTask
. But the problem is that I'm not using the Parameter
directly and it's not showing up in the UI when I register the flow. Is there any way to get around it?
Thank you in advance!Nico Neumann
10/11/2022, 11:29 PMprefect_aws
to upload/list/download files to s3 and also for some shared AWS Secrets. For some functionality I rely on boto3
, e.g. boto3.client("s3", ...).generate_presigned_url(…)
. Prefect 2.5.0 is running on EKS and the flows are deployed to S3 which requires s3fs
.
To use it in a deployment: prefect deployment […] -sb s3/dev
You need to install s3fs to use this block.
https://docs.prefect.io/concepts/filesystems/
My problem is that prefect_aws
and s3fs
have dependency conflicts. I am using pip-tools to set my requirements and get the following error:
# simplified <http://requirements.in|requirements.in> (removed the package versions to might easier find matches)
prefect
prefect_aws
s3fs
$ pip-compile <http://requirements.in|requirements.in>
Could not find a version that matches botocore<1.27.60,<1.28.0,>=1.27.53,>=1.27.59,>=1.27.89 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
Tried: 0.4.1, 0.4.2, 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.6.0, 0.7.0, 0.8.0, 0.8.1, 0.8.2, 0.8.3, 0.9.0 ... [lists all versions here]
1.27.87, 1.27.88, 1.27.88, 1.27.89, 1.27.89
Skipped pre-versions: 1.0.0a1, 1.0.0a2, 1.0.0a3, 1.0.0b1, 1.0.0b2, 1.0.0b3, 1.0.0rc1, 1.0.0rc1
There are incompatible versions in the resolved dependencies:
botocore<1.28.0,>=1.27.89 (from boto3==1.24.89->prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
botocore>=1.27.53 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
botocore<1.27.60,>=1.27.59 (from aiobotocore==2.4.0->s3fs==2022.8.2->-r <http://requirements.in|requirements.in> (line 3))
I have found this issue: https://github.com/fsspec/s3fs/issues/615#issuecomment-1094791081 but not a real solution to fix it.
How can I use prefect_aws
and also deploy flows to S3? Does anyone else have the same problem and found a solution?Adam Green
10/12/2022, 12:50 AMfrom prefect.deployments import Deployment
from prefect.filesystems import S3
s3_block = S3(
aws_access_key_id=aws_key,
aws_secret_access_key=aws_secret,
bucket_path=context["prefect_flows_bucket"],
)
s3_block.save("s3", overwrite=True)
Deployment.build_from_flow(
name="alpha",
work_queue_name="alpha",
flow=healthcheck,
storage=S3.load("s3"),
infrastructure=Process(),
apply=True,
)
When we run mypy on this code, it complains about things not being typed as async. Is it possible to type this code without converting to async?Nace Plesko
10/12/2022, 1:14 AMStartFlowRun
and create_flow_run
, like it's in the docs and for some reason I'm running in a bunch of errors when executing the flow, but none when registering it. Right now I'm getting
Failed to load and execute flow run: ValueError('No flows found in file.')
and previously I was getting a bunch of
Failed to load and execute flow run: KeyError("'__name__' not in globals")`
I feel like I am missing something extremely obvious about executing a flow from within a flow that it's not even documented?Nace Plesko
10/12/2022, 2:35 AMSteph Clacksman
10/12/2022, 7:21 AMimport pandas as pd
from prefect import flow
@flow
def test_flow() -> None:
df = pd.DataFrame({"ID": ["123456789", "223456789"]})
test_subflow(df)
@flow(validate_parameters=False)
def test_subflow(df: pd.DataFrame) -> None:
print(df)
if __name__ == "__main__":
test_flow()
Nic
10/12/2022, 8:12 AMChern Hong Poh
10/12/2022, 9:55 AMShellTask
that returns Command failed with exit code 2
when I registered and quick run the prefect flow. I registered the flow using this command prefect register flow --file testing.py --project staging
. Appreciated if someone can help. This has been bugging me since morning.
## print2.py
print("hello")
## testing.py
import os
import datetime
from datetime import timedelta
import pendulum
import prefect
from prefect import case
from prefect import Flow
from prefect import Parameter
from prefect import task
from prefect.environments.storage import S3
from prefect.schedules import filters
from prefect.schedules.clocks import IntervalClock
from prefect.schedules.schedules import Schedule
from prefect.tasks.control_flow import merge
from prefect.tasks.dbt import DbtShellTask
from prefect.tasks.shell import ShellTask
import subprocess
@task(name="Logging")
def logging_result(stuff):
logger = prefect.context.get("logger")
return <http://logger.info|logger.info>(stuff)
@task(name="Run Python Script", log_stdout=True)
def run_script():
return ShellTask(command=f"python3 print2.py").run()
with Flow(name="DBT Python daily run") as flow:
python_run = run_script()
final = logging_result(python_run)
#flow_state = flow.run()
#shell_output = flow_state.result[python_run].result
#print(shell_output)
Robert Hales
10/12/2022, 11:25 AMTodd de Quincey
10/12/2022, 11:31 AMTodd de Quincey
10/12/2022, 11:31 AMBianca Hoch
10/12/2022, 3:45 PMTodd de Quincey
10/12/2022, 3:47 PMBianca Hoch
10/12/2022, 5:59 PMTodd de Quincey
10/14/2022, 7:03 AMAnna Geller
10/14/2022, 10:03 AM