Hedgar
08/21/2022, 2:03 PMBrad
08/21/2022, 10:18 PM10 15 * * 1-5
and kicked off on a Saturday). How can I debug this? More details insideBen Muller
08/22/2022, 2:41 AMtry except
over my entire flow ( many tasks included ) so that I can log this to a third party ?
I know I can send slack notifications through the automations but I am looking for something more granular where I can share metadata about the flow and not have to query the GraphQlApi. I would like this to be part of the flow itself.Priyank
08/22/2022, 6:12 AMrequests.exceptions.ReadTimeout: HTTPConnectionPool(host='localhost', port=4200): Read timed out. (read timeout=15)
How can I change or configure this timeout ? Also we're running this query for our locally hosted prefect (prefect 1.0)
def readDatabase(query: dict) -> None:
client = prefect.Client()
data = client.graphql(query)
Malavika S Menon
08/22/2022, 10:38 AMSuresh R
08/22/2022, 11:44 AMOscar Björhn
08/22/2022, 11:47 AMJosé Duarte
08/22/2022, 2:02 PMEach work queue can optionally restrict concurrent runs of matching flows.Does this mean that if I have
Flow_A
and Flow_B
running, they can run concurrently for any concurrency limit value?Pim Claessens
08/22/2022, 2:06 PMJosé Duarte
08/22/2022, 2:28 PM@flow
def test():
print("hello world")
And it will fail retro-actively from time to time:
15:18:12.953 | INFO | prefect.agent - Submitting flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
15:18:13.097 | INFO | prefect.infrastructure.process - Opening process 'dangerous-ostrich'...
15:18:13.104 | INFO | prefect.agent - Completed submission of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
15:18:30.922 | ERROR | Flow run 'dangerous-ostrich' - Crash detected! Execution was interrupted by an unexpected exception.
15:18:30.954 | INFO | prefect.engine - Engine execution of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087' aborted by orchestrator: This run has already terminated.
hello world
15:18:35.274 | INFO | prefect.infrastructure.process - Process 'dangerous-ostrich' exited cleanly.
As you can see above, it completed successfully but at the same time it didn’t. Furthermore, the code clearly ran (hello world
was printed), but it was still marked as an error.
Shouldn’t the flow just be marked as completed?Justin Stanley
08/22/2022, 2:32 PMAndrew Richards
08/22/2022, 3:03 PMSam Garvis
08/22/2022, 3:56 PMmichael.urrutia
08/22/2022, 4:25 PMBalázs Aszódi
08/22/2022, 4:26 PMprefect deployment build mssql_dbx_flow.py:dbx_trigger_sync_flow -n airbyte-prefect-dbx-elt -t testing
I have created an agent already and I’m using prefect 2.1.1. I’ve attached my orion settings, as well.
Could you advise me on what am I missing?Slackbot
08/22/2022, 5:46 PMNova Westlake
08/22/2022, 5:55 PMJack Chang
08/22/2022, 6:01 PMTypeError: __init__() got an unexpected keyword argument 'add_default_labels'
My configuration looks like this:
# Configure a custom image
f.run_config = DockerRun(
labels=["grls"],
add_default_labels=False,
)
Anna Geller
08/22/2022, 6:36 PMAaron Goebel
08/22/2022, 7:00 PMMichael Z
08/22/2022, 7:24 PMtrigger=any_failed
in my task. But I really only want to do something if the previous task has failed, and not consider all upstream tasks. Can someone point me in the right direction?Wei Mei
08/22/2022, 7:48 PM- name_input: field required
I see that in the do not edit below line part of the deployment.yaml it has name_input set to required.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def hello_task(name_input):
# Doing some work
print(f"Saying hello {name_input}")
return "hello " + name_input
@flow
def hello_flow(name_input):
hello_task(name_input)
if __name__ == "__main__":
hello_flow("Marvin")
Hedgar
08/22/2022, 8:04 PMJosh Paulin
08/22/2022, 8:10 PMNathaniel Russell
08/22/2022, 8:19 PMfrom prefect import task, Parameter, Flow
from prefect.storage import S3
import prefect
@task
def main_task():
print("foobar")
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("foobar")
storage = S3(bucket="prefect-task-behaviour", stored_as_script=True, local_script_path="main.py")
def build_flow():
with Flow("s3-to-prefect-flow", storage=storage) as building_flow:
main_task()
return building_flow
print(build_flow())
I have an S3 bucket with a copy of this code (named main.py) in it, yet I am not seeing it print out foobar, why is that?Neil Natarajan
08/22/2022, 8:21 PMschedule:
interval: 75
timezone:
However the flow doesn't actually run and I see a bunch of Scheduled
flow runs that eventually become Late
Paco Ibañez
08/22/2022, 9:41 PMMatt Melgard
08/22/2022, 10:39 PMJai P
08/22/2022, 11:51 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
details in the thread!Rio McMahon
08/23/2022, 4:44 AMimport prefect
pc = prefect.get_client()
await pc.read_flow_runs()
it returns a cloudpickle error TypeError: __init__() missing 1 required positional argument: 'path'
(see comments for full stack trace). Previously I was using python 3.7 and was getting another cloudpickle error about version 5 not working. Other prefect client functions work (including read_flow_run
and read_flow_run_states
)
version stuff:
$ prefect version
Version: 2.1.1
API version: 0.8.0
Python version: 3.9.12
Git commit: dc2ba222
Built: Thu, Aug 18, 2022 10:18 AM
OS/Arch: darwin/x86_64
Profile: goodkiwi
Server type: hosted
$ python --version
Python 3.9.12
Any thoughts on how to fix?Rio McMahon
08/23/2022, 4:44 AMimport prefect
pc = prefect.get_client()
await pc.read_flow_runs()
it returns a cloudpickle error TypeError: __init__() missing 1 required positional argument: 'path'
(see comments for full stack trace). Previously I was using python 3.7 and was getting another cloudpickle error about version 5 not working. Other prefect client functions work (including read_flow_run
and read_flow_run_states
)
version stuff:
$ prefect version
Version: 2.1.1
API version: 0.8.0
Python version: 3.9.12
Git commit: dc2ba222
Built: Thu, Aug 18, 2022 10:18 AM
OS/Arch: darwin/x86_64
Profile: goodkiwi
Server type: hosted
$ python --version
Python 3.9.12
Any thoughts on how to fix?TypeError Traceback (most recent call last)
File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/core/formatters.py:707, in PlainTextFormatter.__call__(self, obj)
700 stream = StringIO()
701 printer = pretty.RepresentationPrinter(stream, self.verbose,
702 self.max_width, self.newline,
703 max_seq_length=self.max_seq_length,
704 singleton_pprinters=self.singleton_printers,
705 type_pprinters=self.type_printers,
706 deferred_pprinters=self.deferred_printers)
--> 707 printer.pretty(obj)
708 printer.flush()
709 return stream.getvalue()
File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/lib/pretty.py:393, in RepresentationPrinter.pretty(self, obj)
390 for cls in _get_mro(obj_class):
391 if cls in self.type_pprinters:
392 # printer registered in self.type_pprinters
--> 393 return self.type_pprinters[cls](obj, self, cycle)
394 else:
395 # deferred printer
396 printer = self._in_deferred_types(cls)
File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/lib/pretty.py:640, in _seq_pprinter_factory.<locals>.inner(obj, p, cycle)
638 p.text(',')
...
63 @staticmethod
64 def loads(blob: bytes) -> Any:
---> 65 return cloudpickle.loads(base64.decodebytes(blob))
TypeError: __init__() missing 1 required positional argument: 'path'
import requests
import prefect
pc = prefect.get_client()
response = <http://requests.post|requests.post>(
url=str(pc.api_url),
json={'name':{"any_": ["flow_name"]}},
headers = {"Authorization": "Bearer <token>"}
)
Joao Moniz
08/24/2022, 12:36 PMRio McMahon
08/24/2022, 1:12 PM