Daniel Tashman
10/11/2022, 2:23 PMJohn Mizerany
10/11/2022, 2:30 PMKelvin DeCosta
10/11/2022, 3:54 PMprefect
cli help message over and over again.
ECS console shows a list of STOPPED
tasks, which I'm assuming are various attempts to start the service and keep it running.Nathaniel Russell
10/11/2022, 4:18 PM/usr/local/lib/python3.9/site-packages/prefect/logging/handlers.py:76: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
It then runs the flow code correctly, but after the flow is done it crashes, and gives this error:
[in thread]
All of my flows perform their intended code but all end with this error and say crashed. How do I fix this?Jason Bertman
10/11/2022, 4:40 PMUnhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1191, in orchestrate_task_run
state = await propose_state(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1496, in propose_state
raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run cannot transition to the RUNNING state from the RUNNING state.
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1121, in begin_task_run
task_run.state.data._cache_data(await _retrieve_result(task_run.state))
AttributeError: 'NoneType' object has no attribute '_cache_data'
It seems like the engine is mistaking a task run for not running yet?Walter Cavinaw
10/11/2022, 5:14 PMImran Qureshi
10/11/2022, 7:26 PMCarlo
10/11/2022, 8:28 PMDavid Cupp
10/11/2022, 9:44 PMRRuleSchedule
only takes a "rrule string". It seems easy to convert a single rrule to a string, but as far as I can tell there is no standard implementation to convert an rrule set into an rrule string [3]. Any ideas?
[1] https://docs.prefect.io/api-ref/orion/schemas/schedules/#prefect.orion.schemas.schedules.RRuleSchedule
[2] https://github.com/PrefectHQ/prefect/blob/main/src/prefect/orion/schemas/schedules.py#L322
[3] https://github.com/dateutil/dateutil/issues/856Nace Plesko
10/11/2022, 10:47 PMcommand
for ShellTask
. But the problem is that I'm not using the Parameter
directly and it's not showing up in the UI when I register the flow. Is there any way to get around it?
Thank you in advance!Nico Neumann
10/11/2022, 11:29 PMprefect_aws
to upload/list/download files to s3 and also for some shared AWS Secrets. For some functionality I rely on boto3
, e.g. boto3.client("s3", ...).generate_presigned_url(…)
. Prefect 2.5.0 is running on EKS and the flows are deployed to S3 which requires s3fs
.
To use it in a deployment: prefect deployment […] -sb s3/dev
You need to install s3fs to use this block.
https://docs.prefect.io/concepts/filesystems/
My problem is that prefect_aws
and s3fs
have dependency conflicts. I am using pip-tools to set my requirements and get the following error:
# simplified <http://requirements.in|requirements.in> (removed the package versions to might easier find matches)
prefect
prefect_aws
s3fs
$ pip-compile <http://requirements.in|requirements.in>
Could not find a version that matches botocore<1.27.60,<1.28.0,>=1.27.53,>=1.27.59,>=1.27.89 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
Tried: 0.4.1, 0.4.2, 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.6.0, 0.7.0, 0.8.0, 0.8.1, 0.8.2, 0.8.3, 0.9.0 ... [lists all versions here]
1.27.87, 1.27.88, 1.27.88, 1.27.89, 1.27.89
Skipped pre-versions: 1.0.0a1, 1.0.0a2, 1.0.0a3, 1.0.0b1, 1.0.0b2, 1.0.0b3, 1.0.0rc1, 1.0.0rc1
There are incompatible versions in the resolved dependencies:
botocore<1.28.0,>=1.27.89 (from boto3==1.24.89->prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
botocore>=1.27.53 (from prefect_aws==0.1.4->-r <http://requirements.in|requirements.in> (line 2))
botocore<1.27.60,>=1.27.59 (from aiobotocore==2.4.0->s3fs==2022.8.2->-r <http://requirements.in|requirements.in> (line 3))
I have found this issue: https://github.com/fsspec/s3fs/issues/615#issuecomment-1094791081 but not a real solution to fix it.
How can I use prefect_aws
and also deploy flows to S3? Does anyone else have the same problem and found a solution?Adam Green
10/12/2022, 12:50 AMfrom prefect.deployments import Deployment
from prefect.filesystems import S3
s3_block = S3(
aws_access_key_id=aws_key,
aws_secret_access_key=aws_secret,
bucket_path=context["prefect_flows_bucket"],
)
s3_block.save("s3", overwrite=True)
Deployment.build_from_flow(
name="alpha",
work_queue_name="alpha",
flow=healthcheck,
storage=S3.load("s3"),
infrastructure=Process(),
apply=True,
)
When we run mypy on this code, it complains about things not being typed as async. Is it possible to type this code without converting to async?Nace Plesko
10/12/2022, 1:14 AMStartFlowRun
and create_flow_run
, like it's in the docs and for some reason I'm running in a bunch of errors when executing the flow, but none when registering it. Right now I'm getting
Failed to load and execute flow run: ValueError('No flows found in file.')
and previously I was getting a bunch of
Failed to load and execute flow run: KeyError("'__name__' not in globals")`
I feel like I am missing something extremely obvious about executing a flow from within a flow that it's not even documented?Nace Plesko
10/12/2022, 2:35 AMSteph Clacksman
10/12/2022, 7:21 AMimport pandas as pd
from prefect import flow
@flow
def test_flow() -> None:
df = pd.DataFrame({"ID": ["123456789", "223456789"]})
test_subflow(df)
@flow(validate_parameters=False)
def test_subflow(df: pd.DataFrame) -> None:
print(df)
if __name__ == "__main__":
test_flow()
Nic
10/12/2022, 8:12 AMChern Hong Poh
10/12/2022, 9:55 AMShellTask
that returns Command failed with exit code 2
when I registered and quick run the prefect flow. I registered the flow using this command prefect register flow --file testing.py --project staging
. Appreciated if someone can help. This has been bugging me since morning.
## print2.py
print("hello")
## testing.py
import os
import datetime
from datetime import timedelta
import pendulum
import prefect
from prefect import case
from prefect import Flow
from prefect import Parameter
from prefect import task
from prefect.environments.storage import S3
from prefect.schedules import filters
from prefect.schedules.clocks import IntervalClock
from prefect.schedules.schedules import Schedule
from prefect.tasks.control_flow import merge
from prefect.tasks.dbt import DbtShellTask
from prefect.tasks.shell import ShellTask
import subprocess
@task(name="Logging")
def logging_result(stuff):
logger = prefect.context.get("logger")
return <http://logger.info|logger.info>(stuff)
@task(name="Run Python Script", log_stdout=True)
def run_script():
return ShellTask(command=f"python3 print2.py").run()
with Flow(name="DBT Python daily run") as flow:
python_run = run_script()
final = logging_result(python_run)
#flow_state = flow.run()
#shell_output = flow_state.result[python_run].result
#print(shell_output)
Robert Hales
10/12/2022, 11:25 AMTodd de Quincey
10/12/2022, 11:31 AMQ
10/12/2022, 12:40 PMRoles/RoleBindings
were getting generated at some point in time, but I guess something changed.Himanshu
10/12/2022, 1:28 PMPatrick Tan
10/12/2022, 1:43 PMEmma Rizzi
10/12/2022, 3:17 PMJosé Duarte
10/12/2022, 3:25 PMOluremi Akinwale
10/12/2022, 3:58 PMOluremi Akinwale
10/12/2022, 4:00 PMJosé Duarte
10/12/2022, 4:45 PMflow_run_notification_queue
has both flow_run_state_id
and flow_run_notification_policy_id
but doesn’t connect with either table. Example:
sqlite> .schema flow_run_notification_queue
CREATE TABLE IF NOT EXISTS "flow_run_notification_queue" (
id CHAR(36) DEFAULT ((
lower(hex(randomblob(4)))
|| '-'
|| lower(hex(randomblob(2)))
|| '-4'
|| substr(lower(hex(randomblob(2))),2)
|| '-'
|| substr('89ab',abs(random()) % 4 + 1, 1)
|| substr(lower(hex(randomblob(2))),2)
|| '-'
|| lower(hex(randomblob(6)))
)) NOT NULL,
created DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%f000', 'now')) NOT NULL,
updated DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%f000', 'now')) NOT NULL,
flow_run_notification_policy_id CHAR(36) NOT NULL,
flow_run_state_id CHAR(36) NOT NULL,
CONSTRAINT pk_flow_run_notification_queue PRIMARY KEY (id)
);
CREATE INDEX ix_flow_run_notification_queue__updated ON flow_run_notification_queue (updated);
Along with CONSTRAINT pk_flow_run_notification_queue PRIMARY KEY (id)
there should be something like CONSTRAINT fk_flow_run_notification_queue__flow_state_id__flow_state FOREIGN KEY(flow_state_id) REFERENCES flow_run (id) ON DELETE cascade
as there is on other tables.
Although I am probably missing something, could anyone from the Prefect team take a look?José Duarte
10/12/2022, 4:52 PMErik Amundson
10/12/2022, 5:23 PMYZ
10/12/2022, 5:28 PM