Kien Nguyen
08/13/2021, 3:17 PMAnze Kravanja
08/13/2021, 3:28 PMJoshua S
08/13/2021, 4:16 PMDanny Vilela
08/13/2021, 5:44 PMmax_retries
, retry_delay
, etc), and there’s a task after the ETL task to send a notification to slack (not quite using the Slack integration, but querying the Slack API directly). That said, is it possible to hook into the retry logic such that whenever that ETL task gets retried, I can execute some logic?
For example, “whenever this task is going to be put into the RETRY
state, run this code”? Would this be a state handler like the terminal_state_handler
on the Flow
, but for the specific task? Or just a state handler passed to Flow(state_handlers=[send_slack_message_on_retry])
?Brandon
08/13/2021, 7:17 PMAnicet Tadonkemwa
08/13/2021, 7:28 PMCharles Leung
08/13/2021, 8:37 PMNivi Mukka
08/13/2021, 9:02 PMcluster_kwargs
in DaskExecutor: https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/executors/dask.py#L88-L89
https://docs.prefect.io/api/latest/executors.html#daskexecutorKathryn Klarich
08/13/2021, 9:33 PMParameter('abc', default=[1, 2, 3])
) in my python code it's fine.Hugo Shi
08/13/2021, 10:10 PMCooper Marcus
08/14/2021, 2:27 AMLocalRun
and UniversalRun
- I’m configuring runs in Prefect Cloud interface, and they are running on one of a few agents, one per server. Each server has its own label for its Prefect agent (eg. Server1 has label server1
) - when I configure the run on Prefect Cloud, I think (but I”m not 110% certain, I’ve been doing this too long today 😉 ) that when I specify server1
label, it doesn’t matter whether I select the “local” or “universal” option - huh?YD
08/15/2021, 2:15 AMNo heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2
I also have a flow that I started manually, but it does not start, even though I have no other flows running.
in general, if I have a flow that I need to ensure is running at an exact time (no more than few seconds off) , is it good to use Prefect for this, or is it better to use cron job ?Lan Yao
08/16/2021, 4:28 AMprefect agent kubernetes install -t ${TOKEN} --rbac --label ${CLUSTER_NAME} | kubectl apply -f -
I’m trying to use the Prefect Cloud Automations to do health monitoring of the agent. However, I can’t do it at the agent installation phase as per the Prefect Docs. And I can’t manage to add the agent_config_id
argument to a running agent either. I tried GraphQL, but didn’t find a proper mutation to make it.
Is there any way to make the Automations work for Prefect agent running in Kubernetes cluster for my case? 🙏Marie
08/16/2021, 10:46 AMNacho Rodriguez
08/16/2021, 11:12 AMIvan Indjic
08/16/2021, 12:05 PMTim Enders
08/16/2021, 2:14 PMItalo Barros
08/16/2021, 2:39 PMRobin Norgren
08/16/2021, 4:54 PMYD
08/16/2021, 6:29 PMKathryn Klarich
08/16/2021, 9:33 PMclass FargateECSRun(ECSRun)
) and filling in the parameters like run_task_kwargs
with defaults so that they don't need need to be duplicated in every flow. However, when I go to register the flow, I get an error ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
- is what I am trying to do not allowed? I'm not sure how to debug this because when I try to these directions, built_storage
is just an empty dictionary.Michael Warnock
08/16/2021, 9:41 PMThe conflict is caused by:
prefect 0.15.3 depends on click<8.0 and >=7.0
prefect[aws,github,jupyter] 0.15.3 depends on click<8.0 and >=7.0
coiled 0.0.47 depends on click>=7.1
Tim Chklovski
08/16/2021, 9:45 PMJacob Goldberg
08/16/2021, 10:16 PMstart_date
is last Monday and the end_date
is this Monday. However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the DateTimeParameter
would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
import datetime as dt
from prefect import Flow
from prefect.core.parameter import DateTimeParameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
with Flow(name="test", schedule=schedule) as test_flow:
start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
end_date = DateTimeParameter("end_date", required=False) or dt.date.today()
my_process(start_date, end_date)
This issue is that my_process()
requires a datetime object but is receiving NoneType
. It seems the or
operator is not working as I would expect when defining start_date
and end_date
. Although DateTimeParameter
"evaluates" to None
in Prefect, the or
operator sees them as a Prefect Parameters, so start_date
and end_date
both get defined as None
. What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?Jose Daniel Posada Montoya
08/16/2021, 11:35 PMDockerRun
) in sync with the code/package I have in the repository. One of the approaches I came up with was to develop a Storage (very similar to Git) that would do the following:
1. Clone the repository into a temp folder.
2. Extract the Flow (as it is currently done).
3. Search the repo for a specified Dockerfile
. (the Dockerfile
would do something like an ADD
of the entire repo and a pip install -e .
so that the image has the latest code in it).
4. Build the image with a certain tag (the same tag that DockerRun
would use).
(This is all assuming that the Storage code is executed in the Docker Agent process just before the container is started which would give time to create the image before the run_config
create the container and run the flow inside)
I was going to do a test by creating the custom Storage but I ran into things I didn't take into account like serialization
. But before I explore this option further I wanted to ask:
• Is there any way to achieve what i want, that I am not seeing, that doesn't involve a Container Registry or a CI pipeline?
• Does the solution I propose make sense to you? Do you think it would work or am I missing something?
• Would it be useful for more people and would it be worth some contribution?
I would appreciate any guidance, advice or comments on the subject.Dakota McCarty
08/17/2021, 1:15 AMYD
08/17/2021, 1:30 AMflow.register(project_name='example')
we do need to run flow.register
, right ?Samuel Tober
08/17/2021, 8:46 AMChhaya Vankhede
08/17/2021, 9:45 AMBrett Jurman
08/17/2021, 4:50 PM