Miremad Aghili
11/04/2022, 1:34 PMGiuliano Mega
11/04/2022, 4:22 PMimport time
from prefect import flow, task
from lib.prefect import get_run_logger
@task
def get_inputs():
return range(1, 600)
@task(tags=['sequential-scraper'])
def process_input(item):
logger = get_run_logger()
<http://logger.info|logger.info>(f'Processing item {item}')
time.sleep(1)
<http://logger.info|logger.info>(f'Processed item {item}')
@flow(description='Example of a very simple, API scraping-like Pipeline. '
'Feel free to run it as it does not do anything harmful.')
def sample_pipeline():
new_tokens = get_inputs()
process_input.map(new_tokens)
And what I see essentially is my task runner (Job) taking a very long time until it gets wiped out without running any tasks at all, and the UI (Prefect cloud) leaves the flow in an eternal "running" state.
Enabling debugging shows a lot of:
16:18:26.175 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:26.868 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:27.375 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:28.344 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
Am I misusing this in any way?Amogh Kulkarni
11/04/2022, 5:19 PMCrash detected! Request to <http://prefect-url:4200/api/task_runs/e26b6472-281f-4af1-933c-82516953f4a7/set_state> failed.
Slackbot
11/04/2022, 5:26 PMjpuris
11/04/2022, 6:21 PMNov 04 18:03:30 ip-172-31-30-84 prefect[242862]: 18:03:30.621 | ERROR | prefect.agent - Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: Traceback (most recent call last):
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: File "/home/prefect/insights-prefect-flows/venv/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: func, target_state = self._transitions[(self.state, input_)]
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_PING: 14>)
Full trace in attachment
This started to happen as soon as we upgraded from prefect 2.4.0 to 2.6.5
We are using Prefect Cloud.
Is this a known issue? 😕Sowmiya Anand
11/04/2022, 7:51 PMJames Vaughan
11/04/2022, 7:59 PMVersion group 92f0a1ae-7793-41bd-a662-d4cabbe103bf has no unarchived flows
This same error crops up when I use the Flow ID itself.
I'm trying to do this with CURL just to play with the UI and build some code out as I can't just use the python lib due to some restrictions.
Can someone check out the code in the thread and let me know if you have any thoughts here?Alexandru Anghel
11/04/2022, 8:34 PMdef get_schedule(schedule: str):
return Schedule(clocks=[DatesClock([pendulum.now().add(seconds=10)])]) if schedule == "one time" else CronSchedule(cron=schedule)
Thanks!benson
11/05/2022, 1:38 AMSlackbot
11/05/2022, 5:53 AMchicago-joe
11/06/2022, 12:24 AMAn exception occurred.
➜ flows prefect blocks type delete listblock
Cannot delete Block Type 'listblock'!
I can verify the block type slug is correctFaheem Khan
11/06/2022, 5:59 AMTim Galvin
11/06/2022, 6:29 AMdef main(
sbid,
my_args,
cluster,
):
dask_runner = get_dask_runner(cluster)
# Define flow
@flow(
name=f"Processing holography -- {sbid}",
task_runner=dask_runner,
)
def my_flow():
logger = get_run_logger()
task_super_awesome_worker()
my_flow()
This is a simplified example - but in short I am creating my flow function encapsulated in another function. I am going this as some attributed of the flow's task runner need to be defined at runtime depending on the SLURM cluster it is being executed on, account running the flow, compute resources for the slurm job etc (specified via the dask_jobqeue.SLURMCluster
module/class).
On the CLI my attempts to set the entrypoint
prefect deployment build /path/to/my/flow_script.py:my_flow
results in an error that amounts to my_flow
not being in flow_script.py
. If I try to replace my_flow
with main
, I get an error about main
not being a Flow
. In the docs I do not see an example of how to do this.
So, I am wondering how does one do something like this?YD
11/06/2022, 6:00 PM.submit()
, .wait()
, `.result()`… ?)
In Prefect 1.0 the “task” had “timeout” and “retry_delay”, so that I was able to limit the run time of a task, but wait some time before trying to run it again. (this is useful since sometimes a task get stack due to some cluster resource issue, and I want to let some time pass before rerunning it)
(https://docs-v1.prefect.io/api/latest/core/task.html)
How would you do such a thing with Prefect 2.0 ?
(I see that t he flow decorator has those options, but not the task)Rich Tata
11/06/2022, 9:42 PMCarlo
11/06/2022, 10:28 PMFaheem Khan
11/06/2022, 11:34 PM2j
11/07/2022, 3:18 AMBen Muller
11/07/2022, 3:56 AMTim Galvin
11/07/2022, 9:12 AM--storage-block
is unset to copy.
I thought I could set up a 'dummy' local file system block in the Orion UI (in my own managed server, not prefect cloud), however the prefect deployment build
command says 'github', 's3', 'gcs', 'azure', 'smb'
are supported types.
TL;DR - I need to set a --storage-block
in my deployment, and I am reasonably certain in my situation I do not want to be copying anything to / from different file systems and blocks. I have a common underlying filesystem at the HPC center, and my data are pretty large -- large enough where I can not reasonably expect copying to and from the disk to be feasibleSunjay
11/07/2022, 10:19 AMRabea Yousof
11/07/2022, 11:20 AMThom
11/07/2022, 11:34 AMStéphan Taljaard
11/07/2022, 1:12 PMFlow(...).validate()
, right?vholmer
11/07/2022, 1:18 PMMark Li
11/07/2022, 1:58 PMBlake Stefansen
11/07/2022, 2:06 PMRUNNING
Engine execution of flow run '148d81ea-3cfe-4db1-a0d4-3f3f17748fb0' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
Will add more details to threadMohamed Alaa
11/07/2022, 2:22 PMJV
11/07/2022, 3:15 PM"errorMessage": "HTTP Error 404: Not Found"
I am passing API URL <https://api.prefect.io>
. I understand that this documentation is not latest and I also tried URL <https://api.prefect.cloud/>
and getting same error. Request your inputs regarding this error in Prefect version 2.0Bradley Hurley
11/07/2022, 4:23 PM