Michael Eggert
08/19/2021, 4:12 AMAiden Price
08/19/2021, 6:14 AMresource_manager
class, the flow times out and errors like this appear;
Error during execution of task: AttributeError("'NoneType' object has no attribute 'setup'")
Matias Godoy
08/19/2021, 10:57 AMquery{
flow_run{
name,
created,
state,
flow_id,
duration: <here goes the flow_run execution duration>
}
}
Ideally I could do something like duration = end_time - start_time
, but I think GraphQL does not allow operations like these?
I also checked the flow_run_aggregate
query, but it does not contain such information.
The real problem is that I want to consume this data from Grafana, so I have nowhere to run these calculations and the data should come already calculated in the response.
Any ideas on how could I do this?
Thanks a lot!Italo Barros
08/19/2021, 11:43 AMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:852)'),))
Pierre Monico
08/19/2021, 12:11 PM[git]
extra to be installed for the Git Storage to work, which is not mentioned in the doc)Krapi Shah
08/19/2021, 12:40 PMAbuzar Shaikh
08/19/2021, 1:22 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n TypeError("int() argument must be a string, a bytes-like object or a number, not \'NaTType\'")')
Any idea why?Marwan Sarieddine
08/19/2021, 1:22 PMCharles Leung
08/19/2021, 2:00 PMJack Sundberg
08/19/2021, 2:56 PMYD
08/19/2021, 4:00 PMKathryn Klarich
08/19/2021, 4:51 PMmapped
argument in Task.set_upstream
? is this just another way of applying mapping, or should we always be setting mapped=True
if the upstream task is mapped? (we usually use Task.map()
when mapping tasks)Jean Da Rolt
08/19/2021, 5:19 PM@task
def f1():
...
@task
def f2():
f1()
...
An Hoang
08/19/2021, 5:25 PMprefect.context
to be used in all subsequent tasks? I have a stateful variable that many tasks will use and would love to use it inside many tasks without passing in as parameterJoe Schmid
08/19/2021, 8:30 PMJohn Ramirez
08/19/2021, 8:42 PMMichael Welsh
08/19/2021, 8:46 PMAric Huang
08/19/2021, 9:39 PM# Running the flow locally
aric@aric-dev:~/src/conductor$ prefect run -p conductor/samples/hello_world_flow.py --param foo=bar
Retrieving local flow... Done
Configured local flow run
└── Parameters: {'foo': 'bar'}
Running flow locally...
Flow runner encountered an exception!
Traceback (most recent call last):
File "/home/aric/.local/lib/python3.8/site-packages/prefect/cli/run.py", line 590, in run
result_state = flow.run(parameters=params_dict)
File "/home/aric/.local/lib/python3.8/site-packages/prefect/core/flow.py", line 1250, in run
raise ValueError(
ValueError: Flow.run received the following unexpected parameters: foo
Flow run failed!
# Running the same flow registered on Prefect Cloud
aric@aric-dev:~/src/conductor$ prefect run -n "Hello World" --param foo=bar
Looking up flow metadata... Done
Creating run for flow 'Hello World'... Done
└── Name: monumental-bat
└── UUID: 39c7896b-0bee-4015-be99-648d90da201b
└── Labels: []
└── Parameters: {'foo': 'bar'}
└── Context: {}
└── URL: <https://cloud.prefect.io/><URL>
Sepehr Sadighpour
08/20/2021, 1:10 AMAiden Price
08/20/2021, 5:57 AMasyncio.run(client.send_batch(event_data_batch))
but I get errors like RuntimeError: Event loop is closed
Any advice?Maria
08/20/2021, 8:26 AMPedro Machado
08/20/2021, 12:43 PMcreate_flow_run
and get_task_run_result
.
I am using these two tasks inside of another task so this can be retried as a unit (see code in thread).
Last night, the child flow was triggered three times (triggered by the retries in the parent flow) and all three times the flow succeeded so it looks like there was an issue where the parent flow misread the child flow's status. I am attaching the logs in the thread.
Am I misusing these tasks? Any ideas as to why this could be happening? This has been running every night for a couple of weeks and it's the first time this problem happens. Thanks!Bastian Röhrig
08/20/2021, 1:05 PMDmitry Kuleshov
08/20/2021, 1:48 PMDavid McGuire
08/20/2021, 3:03 PMCronClock
schedule skipping Flow
runs, entirely?Constantino Schillebeeckx
08/20/2021, 4:15 PMWilson Bilkovich
08/20/2021, 4:38 PMKevin Kho
08/20/2021, 6:21 PMconda update certifi
Follows the overflow post where I found a similar problem with this solution: https://stackoverflow.com/questions/41691327/ssl-sslerror-ssl-certificate-verify-failed-certificate-verify-failed-ssl-cKyle Hansen
08/20/2021, 6:40 PM.map
within prefect. Is it guaranteed that the order is preserved across multiple maps, like a python multiprocessing.pool.map
? Or is it not a guarantee that the order is preserved, like multiprocessing.pool.imap_unordered
. I couldn't find whether this was guaranteed or not in the prefect docs
.Jean Da Rolt
08/21/2021, 12:56 AM[core]
logging_conf_file = logging.cfg
then
[loggers]
keys=root,app
[handlers]
keys=consoleHandler,StackDriveHandler
...
Is there anything similar in Prefect? I do not want to add additional logs inside my tasks, but make prefect log using additional loggers.Jean Da Rolt
08/21/2021, 12:56 AM[core]
logging_conf_file = logging.cfg
then
[loggers]
keys=root,app
[handlers]
keys=consoleHandler,StackDriveHandler
...
Is there anything similar in Prefect? I do not want to add additional logs inside my tasks, but make prefect log using additional loggers.Kevin Kho
08/21/2021, 1:24 AMJean Da Rolt
08/21/2021, 2:32 PMimport logging
from logging import StreamHandler
class BlablaHandler(StreamHandler):
def __init__(self):
StreamHandler.__init__(self)
def emit(self, record):
msg = self.format(record)
print("Printing inside handler")
print(msg)
logger = logging.getLogger()
logger.addHandler(BlablaHandler())
however I would prefer to have that in the prefect config file. Is that possible?Michael Adkins
08/21/2021, 3:07 PMget_logger
function that, on logger retrieval, adds the extra handlers you wantJean Da Rolt
08/21/2021, 6:22 PM