Madhup Sukoon
03/17/2022, 8:06 AMfrom prefect.utilities.notifications import slack_notifier
.
.
flow.state_handlers = [slack_notifier()]
I have added the prefect app and also added the webhook URL to the SLACK_WEBHOOK_URL
secret. Any pointers on why this is happening / how to debug?Ievgenii Martynenko
03/17/2022, 10:54 AMlogger = logging.getLogger(__ _name ___)
and I want log messages from that module to appear in Prefect.
I registered extra logger, and log messages defined in Task (AWSPOC) appear, but not the log messages defined inside 'bear'.
My code for "prefect_flow.py" :
import logging
format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
logger = logging.getLogger('magic_logger')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
class AWSPOC(Task):
def __init__(self, name: str, config_file: str):
self.config_file = config_file
super().__init__(name=name)
def run(self):
configuration = load_file(self.config_file)
loader = Loader(configuration=configuration)
<http://logger.info|logger.info>("This message appears in Prefect log output")
loader.run()
executor = LocalDaskExecutor()
host_config = {...}
storage = S3(...)
env = {
"PREFECT__LOGGING__EXTRA_LOGGERS": "['magic_logger']"
}
docker_run_config = DockerRun(image=..., host_config=host_config, env=env)
with Flow(name="AWS POC", executor=executor, storage=storage, run_config=docker_run_config) as flow:
task = AWSPOC(name='...', config_file='...')
task()
flow.register(project_name='AWS POC')
Where I'm wrong? The only idea I have is that 'bear' should have it's own named logger.Vadym Dytyniak
03/17/2022, 11:37 AMDaniel Nilsen
03/17/2022, 12:00 PMVishnu
03/17/2022, 3:03 PMMarc Lipoff
03/17/2022, 3:58 PMwhile has_next_url: ...
) . Is there a better "prefect" way to do this? One of the downsides of my basic way is that, if there is an error along the way (let's say at record 199,999 of 200,000), I lose what I had.Kevin Mullins
03/17/2022, 4:43 PMLatestOnlyOperator
or using depends_on_past
. In Oozie (I know, not my favorite orchestrator), this could be achieved by setting a concurrency
to 1
and execution
to FIFO
or ONLYLAST
.
Are there any plans in Orion to provide similar functionality so users don’t have to implement custom solutions to this common pattern?Andrew Lane
03/17/2022, 5:01 PMretry_delay
attribute of Task
objects. Based on the description of state signals here, I’d expect to be able to raise signals.RETRY()
and have the task wait retry_delay
before the task is run again. However, the retries appear to be being carried out immediately. Can someone explain where I’m going wrong in the wait_until_success
task in the trailing code snippet?Nico Neumann
03/17/2022, 5:03 PMflow.schedule = Schedule(clocks=[CronClock("* * * * *")])
which works fine and later want to add another schedule or delete the current one. My idea is to use GraphQL with the set_flow_group_schedule
and delete_flow_group_schedule
methods.Chris Reuter
03/17/2022, 5:50 PMMarc Lipoff
03/17/2022, 6:11 PMMarvin
03/17/2022, 6:55 PMMarvin
03/17/2022, 6:56 PMStéphan Taljaard
03/17/2022, 6:56 PMcrontab * * * * *
😆Stéphan Taljaard
03/17/2022, 7:16 PMtask
or flow
(i.e. subflows)?alex
03/17/2022, 7:35 PMx slots
used in the late runs section, even though the flow does not have any tags associated with the concurrency limit (I have some other flows that do). Any ideas on how I can resolve this issue?Stéphan Taljaard
03/17/2022, 7:55 PMConstantino Schillebeeckx
03/17/2022, 7:56 PM{parameters: {_contains: {"foo": "bar"}},
but the syntax isn't quite right, I get the error
Syntax Error: Expected Name, found String "date".
can someone help?Philip MacMenamin
03/17/2022, 7:58 PMcommand
which is a return from a prev Task?
eg
with Flow(name='t') as f:
a = task_a()
b = shell_task(a)
Serge Tarkovski
03/17/2022, 8:42 PM$ python flow1.py
Traceback (most recent call last):
File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/client.py", line 257, in api_healthcheck
await self._client.get("/health")
File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/utilities/httpx.py", line 102, in get
return await self.request(
File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/utilities/httpx.py", line 47, in request
request = self.build_request(
TypeError: BaseClient.build_request() got an unexpected keyword argument 'extensions'
Jared Robbins
03/17/2022, 9:01 PMJared Robbins
03/17/2022, 9:16 PMJacqueline Riley Garrahan
03/17/2022, 9:19 PMSerge Tarkovski
03/17/2022, 10:02 PMDarshan
03/17/2022, 10:40 PMBrad
03/18/2022, 3:07 AMlialzm
03/18/2022, 6:56 AMAaron Ash
03/18/2022, 7:56 AMexecutor
and `run_config`s of the flows somehow?Aaron Ash
03/18/2022, 7:58 AMKubernetesRun()
environmentsAaron Ash
03/18/2022, 7:59 AMAaron Ash
03/18/2022, 7:59 AMAnna Geller
03/18/2022, 11:33 AMwith Flow(
FLOW_NAME,
executor=LocalDaskExecutor(),
storage=set_storage(FLOW_NAME),
run_config=set_run_config(local=True),
) as flow:
For the executor, this is more tricky, since as I said before, it's retrieved from storage, but you can try doing something like:
with Flow(
FLOW_NAME,
executor=LocalDaskExecutor(),
storage=set_storage(FLOW_NAME),
run_config=set_run_config(),
) as flow:
datasets = ["raw_customers", "raw_orders", "raw_payments"]
dataframes = extract_and_load.map(datasets)
if __name__ == '__main__':
# register for prod
flow.register("prod_project")
# register for dev
flow.executor = LocalExecutor()
flow.run_config = set_run_config(local=True)
flow.register("dev_project")
But I believe in the above the executor won't be respected since main is not evaluated at flow runtime.
So probably your best bet is to define your main flow in one python file, say: aaron_flow.py
- this defines your flow structure without defining run config or executor:
with Flow("FLOW_NAME", storage=S3(), # just example
) as flow:
datasets = ["raw_customers", "raw_orders"]
dataframes = extract_and_load.map(datasets)
Then, you can have a file called `aaron_flow_dev.py`:
from aaron_flow import flow
flow.executor = LocalExecutor()
flow.run_config = KubernetesRun(image="some_dev_image")
and aaron_flow_prod.py
from aaron_flow import flow
flow.executor = LocalDaskExecutor()
flow.run_config = KubernetesRun(image="some_prod_image")
and then you can register using CLI without worrying about the run config and executorAaron Ash
03/21/2022, 2:20 AM*_dev.py
modules looks like it's perfect for me