Cole Murray
11/03/2022, 11:36 PMBen Muller
11/04/2022, 2:40 AMget_run_logger()
.
In order to test that function I am using .fn()
but I can not test it because get_run_logger()
failed every time because E prefect.exceptions.MissingContextError: There is no active flow or task run context.
Besides possibly using an optional kwarg in every single task that I have, how can I get around this to test my tasks logic properly?Slackbot
11/04/2022, 3:37 AMIkkyu Choi
11/04/2022, 6:01 AMDeepanshu Aggarwal
11/04/2022, 6:41 AMAttributeError: 'coroutine' object has no attribute 'type'
and the flow run crashes. any idea why this happened.
adding detailed logs in the commentBen Muller
11/04/2022, 8:13 AMMathijs Carlu
11/04/2022, 9:25 AMShruti Hande
11/04/2022, 9:45 AMNic
11/04/2022, 10:55 AMTony Yun
11/04/2022, 11:51 AMVadym Dytyniak
11/04/2022, 1:26 PMMiremad Aghili
11/04/2022, 1:34 PMGiuliano Mega
11/04/2022, 4:22 PMimport time
from prefect import flow, task
from lib.prefect import get_run_logger
@task
def get_inputs():
return range(1, 600)
@task(tags=['sequential-scraper'])
def process_input(item):
logger = get_run_logger()
<http://logger.info|logger.info>(f'Processing item {item}')
time.sleep(1)
<http://logger.info|logger.info>(f'Processed item {item}')
@flow(description='Example of a very simple, API scraping-like Pipeline. '
'Feel free to run it as it does not do anything harmful.')
def sample_pipeline():
new_tokens = get_inputs()
process_input.map(new_tokens)
And what I see essentially is my task runner (Job) taking a very long time until it gets wiped out without running any tasks at all, and the UI (Prefect cloud) leaves the flow in an eternal "running" state.
Enabling debugging shows a lot of:
16:18:26.175 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:26.868 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:27.375 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:28.344 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
Am I misusing this in any way?Amogh Kulkarni
11/04/2022, 5:19 PMCrash detected! Request to <http://prefect-url:4200/api/task_runs/e26b6472-281f-4af1-933c-82516953f4a7/set_state> failed.
Slackbot
11/04/2022, 5:26 PMJP
11/04/2022, 6:21 PMNov 04 18:03:30 ip-172-31-30-84 prefect[242862]: 18:03:30.621 | ERROR | prefect.agent - Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: Traceback (most recent call last):
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: File "/home/prefect/insights-prefect-flows/venv/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: func, target_state = self._transitions[(self.state, input_)]
Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_PING: 14>)
Full trace in attachment
This started to happen as soon as we upgraded from prefect 2.4.0 to 2.6.5
We are using Prefect Cloud.
Is this a known issue? 😕Sowmiya Anand
11/04/2022, 7:51 PMJames Vaughan
11/04/2022, 7:59 PMVersion group 92f0a1ae-7793-41bd-a662-d4cabbe103bf has no unarchived flows
This same error crops up when I use the Flow ID itself.
I'm trying to do this with CURL just to play with the UI and build some code out as I can't just use the python lib due to some restrictions.
Can someone check out the code in the thread and let me know if you have any thoughts here?Alexandru Anghel
11/04/2022, 8:34 PMdef get_schedule(schedule: str):
return Schedule(clocks=[DatesClock([pendulum.now().add(seconds=10)])]) if schedule == "one time" else CronSchedule(cron=schedule)
Thanks!benson
11/05/2022, 1:38 AMSlackbot
11/05/2022, 5:53 AMchicago-joe
11/06/2022, 12:24 AMAn exception occurred.
➜ flows prefect blocks type delete listblock
Cannot delete Block Type 'listblock'!
I can verify the block type slug is correctFaheem Khan
11/06/2022, 5:59 AMTim Galvin
11/06/2022, 6:29 AMdef main(
sbid,
my_args,
cluster,
):
dask_runner = get_dask_runner(cluster)
# Define flow
@flow(
name=f"Processing holography -- {sbid}",
task_runner=dask_runner,
)
def my_flow():
logger = get_run_logger()
task_super_awesome_worker()
my_flow()
This is a simplified example - but in short I am creating my flow function encapsulated in another function. I am going this as some attributed of the flow's task runner need to be defined at runtime depending on the SLURM cluster it is being executed on, account running the flow, compute resources for the slurm job etc (specified via the dask_jobqeue.SLURMCluster
module/class).
On the CLI my attempts to set the entrypoint
prefect deployment build /path/to/my/flow_script.py:my_flow
results in an error that amounts to my_flow
not being in flow_script.py
. If I try to replace my_flow
with main
, I get an error about main
not being a Flow
. In the docs I do not see an example of how to do this.
So, I am wondering how does one do something like this?YD
11/06/2022, 6:00 PM.submit()
, .wait()
, `.result()`… ?)
In Prefect 1.0 the “task” had “timeout” and “retry_delay”, so that I was able to limit the run time of a task, but wait some time before trying to run it again. (this is useful since sometimes a task get stack due to some cluster resource issue, and I want to let some time pass before rerunning it)
(https://docs-v1.prefect.io/api/latest/core/task.html)
How would you do such a thing with Prefect 2.0 ?
(I see that t he flow decorator has those options, but not the task)Rich Tata
11/06/2022, 9:42 PMCarlo
11/06/2022, 10:28 PMFaheem Khan
11/06/2022, 11:34 PM2j
11/07/2022, 3:18 AMBen Muller
11/07/2022, 3:56 AMBen Muller
11/07/2022, 3:56 AMKhuyen Tran
11/07/2022, 4:44 PMtest
and delete all flows with the test
tag
• recreate the blocks in a different backend databasesBen Muller
11/07/2022, 6:17 PMJeff Hale
11/07/2022, 8:35 PMBen Muller
11/07/2022, 8:42 PMJeff Hale
11/07/2022, 8:46 PMMichael Adkins
11/07/2022, 8:54 PMBen Muller
11/07/2022, 9:13 PM