Seth Coussens
08/26/2022, 8:55 PMBrandon T. Kowalski
08/26/2022, 9:19 PM20:53:32.166 | INFO | prefect.agent - Submitting flow run 'adf8ae6e-675d-40dd-8a32-078aecbc248b'
20:53:32.359 | ERROR | prefect.agent - Failed to submit flow run 'adf8ae6e-675d-40dd-8a32-078aecbc248b' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 200, in submit_run
await self.task_group.start(submit_flow_run, flow_run, infrastructure)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in start
return await future
File "/usr/local/lib/python3.9/site-packages/prefect/infrastructure/submission.py", line 47, in submit_flow_run
infrastructure = _prepare_infrastructure(flow_run, infrastructure)
File "/usr/local/lib/python3.9/site-packages/prefect/infrastructure/submission.py", line 35, in _prepare_infrastructure
"env": {**base_flow_run_environment(flow_run), **infrastructure.env},
AttributeError: 'S3' object has no attribute 'env'
Any pointers?Chris Gunderson
08/26/2022, 9:36 PMfrom prefect.tasks.notifications import SlackTask
class Alert:
def alert_on_special_failure(task, old_state, new_state):
if new_state.is_failed():
if getattr(new_state.result, "flag", False) is True:
errMsg = '--- LOADER ERROR ---'
msg = "{}\nTask: `{}` FAILED.\nThe loader process failed: `{}`".format(
errMsg, task.name, new_state.result.value)
SlackTask().run(message = msg, webhook_secret = "SLACK_PREFECT_DEV")
return new_state
This was previous called like this:
@task(name = 'send API request',
max_retries = 3,
retry_delay = timedelta(minutes = 5),
state_handlers = [alert_on_special_failure])
def post_request_process(
If we add this alert on special failure object, will we need to pass in the task, old_state, and new_state? Were these variables global?Hedgar
08/27/2022, 7:41 AMOfir
08/27/2022, 11:14 AMgit clone --depth 1 <https://github.com/PrefectHQ/prefect.git>
cd prefect/examples/tutorial
Ofir
08/27/2022, 11:14 AMOfir
08/27/2022, 11:15 AMofir@Ofirs-MacBook-Pro-2 prefect % ls examples/
ls: examples/: No such file or directory
ofir@Ofirs-MacBook-Pro-2 prefect %
Ofir
08/27/2022, 11:28 AMofir@Ofirs-MacBook-Pro-2 prefect % git checkout 1.x
Branch '1.x' set up to track remote branch '1.x' from 'origin'.
Switched to a new branch '1.x'
ofir@Ofirs-MacBook-Pro-2 prefect % ls examples
conditional.py kafka mapping.py old parameters.py tutorial
ofir@Ofirs-MacBook-Pro-2 prefect % ls examples/tutorial
01_etl.py 03_parameterized_etl_flow.py 05_schedules.py aircraftlib
02_etl_flow.py 04_handle_failures.py 06_parallel_execution.py requirements.txt
ofir@Ofirs-MacBook-Pro-2 prefect %
Ofir
08/27/2022, 11:29 AMgit clone --branch 1.x --depth 1 <https://github.com/PrefectHQ/prefect.git>
cd prefect/examples/tutorial
Keith Hickey
08/27/2022, 2:15 PMVenkat Ramakrishnan
08/28/2022, 4:04 AMAnat Tal Gagnon
08/28/2022, 10:54 AMAnat Tal Gagnon
08/28/2022, 11:02 AMBenson Mwangi
08/28/2022, 5:17 PMibrahem
08/29/2022, 7:19 AMFaheem Khan
08/29/2022, 7:50 AMAndreas Nord
08/29/2022, 8:01 AMEnrique
08/29/2022, 9:20 AMHedgar
08/29/2022, 11:27 AMto_csv()
method. The path was created by pathlib:
file-path = Path("data/fresh-data.csv”)
file-path.parent.mkdir(parents=True,exist_ok=True)
Why can't I see fresh data, is there an extra thing am missing. Just trying stuff with prefect 2 on my local systemBrandon T. Kowalski
08/29/2022, 12:16 PMLouis Amon
08/29/2022, 1:13 PMJohnathan Nguyen
08/29/2022, 1:33 PMScott White
08/29/2022, 1:34 PMprefect<2.0
and the functional API?
I will share the pseudo-code and syntax in a reply.José Duarte
08/29/2022, 1:47 PMimport prefect
import logging
logger = logging.getLogger(__name__)
@prefect.flow
def f():
<http://logger.info|logger.info>("test") # doesn't appear
What am I doing wrong?José Duarte
08/29/2022, 2:23 PMMichael Law
08/29/2022, 3:54 PMIlya Galperin
08/29/2022, 3:55 PM@flow(name="always-fail")
def entrypoint():
raise ValueError("my error message")
if __name__ == "__main__":
state = entrypoint(return_state=True)
print(state)
print(state.result)
In this scenario, state.result
can give us the detailed error message for passing back into Slack which seems really useful. However, it’d also be great if we can also provide a direct link to the flow using the flow’s run id. How are other folks implementing this, and is there a way to directly access the flow’s run id after calling the flow from main (or otherwise)?José Duarte
08/29/2022, 4:07 PMprefect.exceptions.MissingContextError: Logger 'dystematic.pdbt.ftp' attempted to send logs to Orion without a flow run id. The Orion log handler can only send logs within flow run contexts unless the flow run id is manually provided.
This isn’t covered in the post nor the docs.
The reason I want to use the regular Python logs is because I cannot use the Prefect one when launching from the console as I get:
RuntimeError: There is no active flow or task run context.
Tomás Emilio Silva Ebensperger
08/29/2022, 4:39 PMNick Coy
08/29/2022, 4:41 PMUserWarning: Block document has schema checksum sha256:dbeeaf09aa78947a7c576549b11e098c00a25bcbbf90b8d8b70c0c3a3fc8f4a2 which does not match the schema checksum for class 'Process'. This indicates the schema has changed and this block may not load.
When I go to the bucket I see the flow files there and the flow runs fine.