Mark Li
10/17/2022, 1:43 PMJustin Trautmann
10/17/2022, 2:35 PMprefect concurrency-limit inspect
command but is in fact in status completed. this effectively lowers the concurrency-limit for all subsequent tasks as these pseudo-active tasks keep occupying a slot for the tag. are we at all supposed to adapt the concurrency limit while tagged tasks are being executed or is this considered bad practice?
Please find in the thread a self-contained example flow that can be used to reproduce the behavior. Any advice i much appreciated. Thank you.redsquare
10/17/2022, 2:57 PM/usr/local/bin/python: No module named prefect.engine.__main__; 'prefect.engine' is a package and cannot be directly executed
Evan Curtin
10/17/2022, 3:17 PMKelvin DeCosta
10/17/2022, 5:03 PMECSTask
infrastructure.
I'm programmatically creating a Deployment
for a simple flow and it seems to have registered on our Prefect Cloud.
However, the deployment run fails without log messages in Prefect Cloud.
The agent logs botocore.exceptions.ParamValidationError
, which makes me think this might be out of my reach.
I'm currently in what seems to be the exact same situation as @Anthony Desmier (link to thread) is in.
Not really sure where to go next.
Any help is appreciated a lot, thank you for your time!Ben Ayers-Glassey
10/17/2022, 5:15 PMprefect deployment apply
command has a typo in its output:
Successfully loaded 'sampleflow_dev'
Successfully uploaded 4 files to <gcs://dev-prefect-work/flow_storage/>
Deployment 'sampleflow/sampleflow_dev' successfully created with id 'c3b605e4-ca96-45eb-b51a-b048ca6d2e08'.
View Deployment in UI: <https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1b8a7-ed9c-46d2-a88a-55eef95b8ef7/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08>
To execute flow runs from this deployment, start an agent that pulls work from the 'kubernetes' work queue:
$ prefect agent start -q 'kubernetes'
...that link it shows is incorrect:
https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1[…]55eef95b8ef7/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08
...it should be:
https://app.prefect.cloud/account/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspace/43f1[…]/deployments/deployment/c3b605e4-ca96-45eb-b51a-b048ca6d2e08
...in other words, it's missing a "/deployments"Kevin Wang
10/17/2022, 5:32 PMSean Davis
10/17/2022, 5:56 PMasyncio.Semaphore
or asyncio.Queue
and the like? Thanks for any suggestions or pointers.Xavier Lesperance
10/17/2022, 6:34 PMJarvis Stubblefield
10/17/2022, 8:52 PM10 failed, 4486 passed, 39 skipped, 3 xfailed, 11 errors in 1173.85s (0:19:33)
I wasn’t expecting to have failures… I’ve added my code and am now running the 20 minutes of tests to ensure it’s working as expected… I’m assuming so long as I don’t get to 11 failed
in the summary I should be okay right?Cameron Chafetz
10/17/2022, 10:21 PMkent
10/18/2022, 1:11 AMfrom prefect import flow,get_run_logger
@flow
def my_favorite_function():
logger = get_run_logger()
<http://logger.info|logger.info>("aaaaaaaaaaaaaaaaaaaaaaaaaa")
print("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
return 42
prefect deployment build flows/tutorial.py:my_favorite_function -n etl --cron "*/1 * * * *" -q etl --apply
prefect agent start -q 'etl
Is there a problem?John Ramey
10/18/2022, 2:03 AMTask.map
can do something similar to generate tasks, but I’d prefer subflows for my project. Is this possible? If so, any docs or resources y’all can point me to? Thanks!Shane Hu
10/18/2022, 2:13 AM[tool.poetry.dependencies]
python = "^3.10"
prefect = "^2.6.1"
playwright = "^1.27.1"
import prefect
from playwright.sync_api import Page, sync_playwright
@prefect.task
def go_to_a_web(page: Page):
page.goto("<https://formulae.brew.sh/cask/slack>")
@prefect.flow
def main_flow():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False)
page = browser.new_page()
go_to_a_web(page)
main_flow()
I got
greenlet.error: cannot switch to a different thread
any help would be much appreciatedShrikkanth
10/18/2022, 5:27 AMdef post_to_slack_on_success(task, old_state, new_state):
if new_state.is_successful():
flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
msg = task_run.get_result()
SlackTask(message=msg).run()
flow.state_handlers.append(slack)
Second flow function try:
def post_to_slack_on_success(task, old_state, new_state):
if new_state.is_successful():
print(new_state)
print(old_state)
print("Result : ", new_state.result, new_state.message)
Screenshot of the alert:Shruti Hande
10/18/2022, 5:58 AMAndreas Nigg
10/18/2022, 6:04 AMAngelika Tarnawa
10/18/2022, 8:30 AMprefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
I have tried to re-register flow and I'm sure that flow run and registered flow are the sameSlackbot
10/18/2022, 10:21 AMAntti Tupamäki
10/18/2022, 10:53 AMTim-Oliver
10/18/2022, 1:21 PMprefect block ls
times out. If I switch to a network without proxy everything works. Any suggestions?Iuliia Volkova
10/18/2022, 2:02 PMKyle McChesney
10/18/2022, 2:29 PM@resource_manager
class LocalFileManager:
...
def setup(self) -> 'LocalFileManager':
self.temp_dir = Path(mkdtemp())
self.logger = prefect.context['logger']
<http://self.logger.info|self.logger.info>('Set up LocalFileManager @ %s', self.temp_dir)
self._lock = RLock()
self._cache = {}
return self
def cleanup(self, _) -> None:
with self._lock:
<http://self.logger.info|self.logger.info>('Cleaning up: %s', self.temp_dir)
shutil.rmtree(self.temp_dir)
I had a flow fail, the setup task is the root failure. Here are the logs
17 October 2022,06:09:30 prefect.CloudTaskRunner INFO Task 'LocalFileManager.setup': Starting task run...
17 October 2022,06:09:30 prefect.LocalFileManager.setup INFO Set up LocalFileManager @ /tmp/tmp1b3_t4xs
17 October 2022,06:09:31 prefect.CloudTaskRunner INFO Task 'LocalFileManager.setup': Finished task run for task with final state: 'Success'
17 October 2022,08:55:01 prefect.CloudTaskRunner INFO Task 'LocalFileManager.setup': Starting task run...
17 October 2022,08:55:01 prefect.CloudTaskRunner ERROR Task 'LocalFileManager.setup': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 884, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.7/site-packages/prefect/tasks/core/resource_manager.py", line 25, in run
return mgr.setup()
AttributeError: 'NoneType' object has no attribute 'setup'
17 October 2022,08:55:01 prefect.CloudTaskRunner INFO Task 'LocalFileManager.setup': Finished task run for task with final state: 'Failed'
It seems like the task ran twice and fails for the second run. This exact flow has run multiple times without issue, and this is a common resource manager used in multiple other flows.Mansour Zayer
10/18/2022, 2:50 PMparameters{...}
I get this error Field parameters must not have a selection since type "jsonb" has no subfields.
Any ideas on how I can access the subfields in parameters and use them in a where condition?
Thank youSam Garvis
10/18/2022, 3:12 PMNace Plesko
10/18/2022, 3:33 PMiñigo
10/18/2022, 3:57 PMJason Thomas
10/18/2022, 4:09 PM#prefect-2-orion
Hi all, question. Anything we can do in a @task
can also be done in a @flow
, but tasks have a limitation: they can’t call other tasks.
In a complex/nested job, for every function except the top-level flow I have to think about whether it is atomic and can be a task, or whether I will want to split it out further into multiple tasks, in which case it needs to be a flow.
But I realized I could just make everything a flow so I don’t have to worry about it. If I decide to split it out I can. If I don’t, no problem. Given the power and flexibility of flows, and that one arbitrary limitation of tasks, why use tasks at all?
So, what am I missing here? Are tasks obsolete? What’s their unique use case?Tim Enders
10/18/2022, 4:21 PM11:20:34.559 | ERROR | Task run 'Get-Items-d8ed86f1-2199' - Crash detected! Request to <https://api.prefect.cloud/api/accounts/cafe3a79-624b-468d-87a7-97fde3358a01/workspaces/5d09d677-90b8-4ef8-a9be-9760b422937a/task_runs/a13a62a5-5e54-4b1d-a976-98635a6913c6> failed: Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
result = func(*args)
File "/usr/lib/python3.10/ssl.py", line 975, in do_handshake
self._sslobj.do_handshake()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:997)
Christopher
10/18/2022, 4:35 PMECSTask(...).save("dev-trial", overwrite=True)
on my dev machine. It shows up in Prefect Cloud.
2. I have created a deployment with prefect deployment build -n dev-trial -q dev -ib ecs-task/dev-trial -a flows/healthcheck.py:healcheck
again from my dev machine
3. I have started a local agent with prefect agent start -q dev
and triggered a job. All works.
4. I have started an agent inside a container and triggered the job, but now it fails with the error KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
It seems like in the container, it's not able to resolve the block reference. But isn't that embedded inside the yaml downloaded from Prefect Cloud?