Ken Nguyen
11/10/2021, 4:37 PMmodel = Parameter('model_name')
. I want to announce the user input in Slack via my state handler, but when I do it displays as <Parameter: model_name>
rather than the actual user input. Does anyone have any suggestions on what I can do to display the user input?Jason Boorn
11/10/2021, 5:08 PMval = my_task()
where val is some json thing. I also want to set that tasks downstream dependencies, though, which I had been doing with val.set_downstream(othertask)
(although I was always a little confused as to why that would work). Well, now it's stopped working. What's the right way to do both of these things?Tim Enders
11/10/2021, 6:24 PMTheo Platt
11/10/2021, 7:46 PMAWSClientWait
to monitor the status of each of those jobs. Here's the code for that mapped task -
@task
def wait_batch(job_id, delay, max_attempts):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
<http://logger.info|logger.info>(f"Job complete: {job_id}")
return job_id
But what we are sometimes seeing are one or more batch jobs failing, which then somehow stops the other jobs from responding to this AWSClientWait call... and so the mapped task keeps running even though all the jobs have either failed or completed. Any ideas?Tao Bian
11/10/2021, 9:24 PMHenry Harrison
11/10/2021, 9:42 PM[1, 2, 3, 4]
. In different points I want to do overlapping and non-overlapping windows. So, 4 mapped tasks with inputs [(1, 2), (2, 3), (3, 4)]
and then (separately) another group of 2 mapped tasks with inputs [(1, 2), (3, 4)]
. Is this possible?An Hoang
11/10/2021, 10:04 PMKevin Kho
11/10/2021, 11:51 PM김응진
11/11/2021, 1:29 AMAdam Everington
11/11/2021, 7:44 AMPiyush Bassi
11/11/2021, 11:31 AMAdam Everington
11/11/2021, 1:03 PMflow.schedule = Schedule(
clocks=[IntervalClock(timedelta(days=1))],
filters=[at_time(datetime.time(20)),is_weekday]
)
flow.schedule.next(10) #<- I have also added this line in but it's done nothing
krishna chaitanya
11/11/2021, 1:17 PMDaniel Katz
11/11/2021, 2:28 PMDaniel Katz
11/11/2021, 2:29 PMDoug Balog
11/11/2021, 2:32 PMPedro Machado
11/11/2021, 3:05 PMcreate_flow_run
task. The child flow takes over 3 hours and completed successfully. However, the parent flow's task failed with No heartbeat detected from the remote task; marking the run as failed.
I am using Prefect Cloud.
Please see thread for the code I am using to run the child flow. Thanks!Tim Enders
11/11/2021, 3:21 PMgoogle
extras?Daniel Katz
11/11/2021, 3:34 PMJohn T
11/11/2021, 4:47 PMsubmitted
and running
is 30-45 seconds. I suspect there’s some polling interval, but I don’t know where this polling interval constant value lives in the Prefect Code to lower it. Any help on this would be great!
ThanksAusten Bouza
11/11/2021, 6:25 PMCooper Marcus
11/11/2021, 8:00 PMJack Sundberg
11/11/2021, 8:21 PMprefect.tasks.prefect.flow_run
module to create and wait for flow runs, and have a question of how this is handled. Say I have Workflow A which (within it) submits and then waits for Workflow B. So the waiting step is a specific task within Workflow A. Is that waiting task sitting idle in the executor (e.g. is it using up a Dask worker)?Santiago Gonzalez
11/11/2021, 8:40 PMPrefectSecret
different, depending on a value of a parameter. For example: If the param env
is ‘dev’ i need to take db_dev_url
PrefectSecret, otherwise takes db_prod_url
PrefectSecret. Does it help me to do that? Or what do you think is the best way to get it? thanksIsaac Brodsky
11/11/2021, 9:01 PMKen Nguyen
11/11/2021, 9:06 PMFrank Oplinger
11/11/2021, 10:18 PMFina Silva-Santisteban
11/11/2021, 10:36 PM.map()
function when using the imperative prefect api? (The docs only show examples with the functional api )
I’ve tried using the mapped
argument but that doesn’t seem to be right 🤔:
flow.set_dependencies(
upstream_tasks=task_that_returns_a_dictionary,
task=task_that_should_do_something_for_a_single_element,
mapped=True,
keyword_tasks=dict(single_element=task_that_returns_a_dictionary)
)
This is the error message:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Josh
11/11/2021, 10:44 PMKathryn Klarich
11/11/2021, 11:58 PMKathryn Klarich
11/11/2021, 11:58 PMKevin Kho
11/12/2021, 12:09 AMemail_from
to something else, but I don’t think so after reading through it.Kathryn Klarich
11/12/2021, 3:55 PM