Prashob Nair
11/24/2021, 6:23 PMcreate_flow_run
to start a new flow 2 mins after the previous task.I have set the below parameter as follows but the flow run starts immediately instead.Please let me know where I'm going wrong.Thanks!
scheduled_start_time=pendulum.now().add(minutes=2),
Pedro Machado
11/24/2021, 8:33 PMrequests
session to make the API requests. This class also implements rate limiting.
I'd like to confirm that if I use the `LocalDaskExecutor`with threads I can pass a single instance of the class to a mapped task and it effectively rate limit across all mapped tasks. Also, is there a benefit to using a resource manager task to instantiate the class that queries the api?haf
11/24/2021, 11:34 PMaaron
11/25/2021, 1:15 AMAaron Ash
11/25/2021, 1:30 AMproject
name of the currently executing flow from the context or somewhere else?Aaron Ash
11/25/2021, 1:35 AMMadhavi
11/25/2021, 3:31 AM夏文思
11/25/2021, 6:44 AMYong Tian
11/25/2021, 8:03 AMAndré Petersen
11/25/2021, 10:08 AMDaniil Ponizov
11/25/2021, 10:39 AMhaf
11/25/2021, 1:28 PMKeyError: 'data'
— have you seen this error message before?haf
11/25/2021, 1:30 PMVince Bob
11/25/2021, 2:17 PMvalidation_task(
context_root_dir=root_dir,
checkpoint_name=expectation_checkpoint_name
)
When I run the command on GE (great_expectations --V3-api checkpoint run my_checkpoint), it works, but on prefect task, I have an exception:
With GE V3 api:
.....
for batch in ge_checkpoint["batches"]:
TypeError: 'Checkpoint' object is not subscriptable
The same with GE V2 api
...
for batch in ge_checkpoint["batches"]:
TypeError: 'LegacyCheckpoint' object is not subscriptable
Great_expectations=0.13.43 (also tried with 0.12.10 version)
prefect=0.15.9
Anyone experienced this pb?Elijah Roussos
11/25/2021, 3:38 PM~/.prefect/config.toml
locally. From the docs it seems like you can only set strings in the toml, but I need JSON. I’ve tried setting it as a JSON string and also in toml syntax to no avail. Is there any way to set a local JSON secret?Adam Everington
11/25/2021, 3:48 PMAnh Nguyen
11/26/2021, 2:36 AMBruno Murino
11/26/2021, 11:17 AMJohn Shearer
11/26/2021, 12:42 PMdate
in prefect context. The docs say "an actual datetime object representing the current time".
The datetime value appears to be the same value across all tasks within a flow- so I assuem this is actually the start time of the flow? This behaviour is what I want, but want to confirm my assumption.Giovanni Giacco
11/26/2021, 1:41 PMAleksandr Liadov
11/26/2021, 2:44 PMPrasanth Kothuri
11/26/2021, 3:41 PMJinho Chung
11/26/2021, 4:46 PMErick House
11/26/2021, 11:48 PMitay livni
11/27/2021, 3:48 AMhaf
11/27/2021, 12:05 PMKilledWorker
error which seemingly fails the whole flow. Despite this, the workers are alive and fine (more in thread)Jake Watson
11/27/2021, 3:52 PMErick House
11/27/2021, 4:29 PMfrom prefect import flow
@flow
def my_favorite_function():
print("This function doesn't do much")
return 42
print(my_favorite_function())
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow
[SQL: INSERT INTO flow (id, created, updated, name, tags) VALUES (?, ?, ?, ?, ?) ON CONFLICT (name) DO NOTHING]
[parameters: ('f4971b70-0675-41c8-af7b-efcf8e3c2254', '2021-11-27 16:24:21.653684', '2021-11-27 16:24:21.653699', 'my-favorite-function', '[]')]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
❯ prefect version
2.0a5
❯ sqlite3 version
SQLite version 3.36.0 2021-06-18 18:58:49Lana Dann
11/27/2021, 4:34 PMECSRun
to take the most recent (or the only revision) of a task_definition_arn
? Otherwise we’d have to update and deploy the flow every time we update a task definition which is not idealitay livni
11/27/2021, 9:37 PMitay livni
11/27/2021, 9:37 PM@task
def get_oup_deinitions(node_input_dict):
oupDefs = oup.OUPDefinitions("en-us", creds.OUP)
oup_df = oupDefs.run(node_input=node_input_dict)
return oup_df
@flow
def get_node_resources(node_input_dict):
oup_df = get_oup_deinitions(node_input_dict=node_input_dict)
return oup_df
This is the traceback:
16:03:19.983 | Flow run 'pumpkin-pony' encountered exception:
Traceback (most recent call last):
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 377, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 48, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "py39lmap/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 805, in run_sync_in_worker_thread
return await future
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 743, in run
result = func(*args)
File "/egm_make_lmap/extract_resources/get_resources.py", line 83, in get_node_resources
oup_df = get_oup_deinitions(
File "py39lmap/lib/python3.9/site-packages/prefect/tasks.py", line 271, in __call__
return enter_task_run_engine(
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 450, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 59, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "py39lmap/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
return asynclib.run_async_from_thread(func, *args)
File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 830, in run_async_from_thread
return f.result()
File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 510, in create_and_submit_task_run
future = await flow_run_context.executor.submit(
File "py39lmap/lib/python3.9/site-packages/prefect/executors.py", line 180, in submit
self._results[task_run.id] = await run_fn(**run_kwargs)
File "py39lmap/lib/python3.9/site-packages/prefect/client.py", line 59, in wrapper
return await fn(*args, **kwargs)
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 620, in orchestrate_task_run
terminal_state = await user_return_value_to_state(
File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 734, in user_return_value_to_state
if is_state(result) or is_state_iterable(result):
File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
if isinstance(obj, IterableABC) and obj:
File "py39lmap/lib/python3.9/site-packages/pandas/core/generic.py", line 1537, in __nonzero__
raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
16:03:19.989 | Shutting down executor `SequentialExecutor`...
16:03:20.066 | Flow run 'pumpkin-pony' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
Failed(message='Flow run encountered an exception.', type=FAILED)
Anna Geller
11/28/2021, 10:49 AMAnurag Bajpai
11/28/2021, 11:45 AMFile "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
if isinstance(obj, IterableABC) and obj:
casting obj, which in this case is a pandas dataframe, to bool causes that errorAnna Geller
11/28/2021, 1:06 PMitay livni
11/28/2021, 2:52 PMfrom prefect import flow, task
import numpy as np
import pandas as pd
@task
def pandas_task(config: dict):
df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
return df
@flow
def main_flow():
# do some things
# then call another flow function
df = pandas_task(config={})
# do more things
# run the flow
flow_state = main_flow()
Anna Geller
11/28/2021, 3:01 PM<http://df.to|df.to>_dict()
, this should work.itay livni
11/28/2021, 3:05 PMAnna Geller
11/28/2021, 3:08 PMitay livni
11/28/2021, 3:54 PMAnna Geller
11/28/2021, 4:18 PMfrom prefect import flow, task
class SomeDataProcessor:
def __init__(self):
pass
def run_etl(self):
pass
@task
def some_task():
SomeDataProcessor().run_etl()
@flow
def main():
some_task()
if __name__ == "__main__":
main()
itay livni
11/28/2021, 4:22 PM@task
. I was hoping to get rid of those.Anna Geller
11/28/2021, 4:25 PM@flow
def main():
SomeDataProcessor().run_etl()
The trade-off here is that the more tasks you have, the more fine-granular visibility you have when something fails.Michael Adkins
11/28/2021, 5:51 PMitay livni
11/28/2021, 11:34 PMSomeClass(Task)
without making a helper function (I could be missing something)
2. I enjoy not having to register flow to get to a UI.
3. Overall less template code i.e. flow.visualize, flow.register ...
4. Radar is cool. I cannot wait to see how the subflows and recursive nature of my flow is represented.
5. The new graph showing flow runs is less intuitive to understand than the current simple bar chart. But I imagine as I get used to it things will change.Anna Geller
11/29/2021, 12:08 AMnicholas
11/29/2021, 8:44 PMitay livni
11/30/2021, 12:10 AMnicholas
11/30/2021, 12:12 AM