Tim Galvin
06/22/2023, 11:47 AM@task
decorated function blocking when calling it as is, e.g.
@task
def square_num(a):
return a**2
@flow
def main_flow():
squares = [square_num(i) for i in range(4)]
here each time square_num
is called in the loop, it is not a PrefectFuture
that is stored, it is the evaluated number. To get the behaviour that is similar to the @dask.delayed
type decorated function, the .submit
method should be used.
I ask this question after mocking up a port of a dask
workflow over to prefect
. If the __call__
of a Task
instance behaved similar to its submit
method, the conversion would basically be as simple as replacing @delayed
with @task
and calling it a day, I mocked up a extra Task
keyword argument in my own prefect
fork called block_on_call
, and could tweak this __call__
vs .submit()
behaviour on a per @task
basis. Maybe it is because of my previous dask
exposure, but this kind of feels more natural to me, and kind of makes this toy porting problem pretty effortless.
Is it a case that the current Task.__call__
behaviour is to support a DAG-less type use case/style? Would there be much interest in a pull request of the above make sense/be useful, or am I missing the larger intent here? Does my rambling make sense?Tanishq Hooda
06/22/2023, 12:27 PMprefect agent start -p my-pool -p my-pool-2 -p my-pool-3
Gintautas Jankus
06/22/2023, 12:32 PMRobert Kowalski
06/22/2023, 1:22 PMimport asyncio
from typing import List
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
async def test_task(numbers: List[int]):
print(numbers)
@flow(task_runner=DaskTaskRunner('<tcp://10.10.10.200:8786>'))
async def my_parent():
numbers = [i for i in range(2)]
coros = test_task.map(numbers)
await asyncio.gather(*[coros])
if __name__ == "__main__":
asyncio.run(my_parent())
Does somebody tell me what i'm doing wrong ? Without mapping test_task, it works, mapping + local dask works too. but if i want use remote dask cluster and mapping i've receive this error:
15:19:09.646 | INFO | distributed.batched - Batched Comm Closed <TCP (closed) Client->Scheduler local=<tcp://10.61.14.2:60674> remote=<tcp://10.10.10.200:8786>>
Traceback (most recent call last):
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
15:19:09.659 | INFO | Task run 'test_task-0' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1
15:19:09.688 | INFO | Task run 'test_task-1' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-1-1249f0856b0a4a138e0ca263f104cdea-1
15:19:09.744 | ERROR | Flow run 'brass-worm' - Finished in state Failed('2/2 states failed.')
Traceback (most recent call last):
File "/home/robin/.config/JetBrains/PyCharmCE2023.1/scratches/tmp_dask.py", line 21, in <module>
asyncio.run(my_parent())
File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 298, in wait
return await future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/client.py", line 305, in _result
raise exception
concurrent.futures._base.CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1
Process finished with exit code 1
any tip will be appreciatedRohan Chutke
06/22/2023, 1:23 PM@task
def extract():
logger = get_run_logger()
result = sf_operator.set_stage(url='<S3://bucket/folder/working/>',
stage_location='EXTERNAL_TABLES.BACKUP_TABLE')
[<http://logger.info|logger.info>(log_message) for log_message in result]
result = sf_operator.set_external_table(external_table='EXTERNAL_TABLES.BACKUP',
stage_location='EXTERNAL_TABLES.BACKUP_TABLE')
[<http://logger.info|logger.info>(log_message) for log_message in result]
Here in sf_operator.set_stage, I am using from prefect_snowflake.database import SnowflakeConnector, snowflake_query
to run query to create a stage in snowflake.
But I am getting
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?')
Can anyone please explain where am I using task inside a task and a possible work around this? This worked fine in prefect 1
Any help will be appreciated.Vadym Dytyniak
06/22/2023, 1:45 PMIosu Santurtun
06/22/2023, 1:58 PMSamuel Hinton
06/22/2023, 4:25 PMTrent Shapiro
06/22/2023, 4:58 PMParsa-SRA
06/22/2023, 7:35 PMprefect_aws.secrets_manager.read_secret()
and am stuck with a coroutine object.Jordan Davie
06/22/2023, 7:36 PM[Link text Here](<https://link-url-here.org>)
, however the link is still not clickable. I believe I need to set PREFECT_LOGGING_MARKUP=True
— any recommendation on the best place to configure this?Matt Alhonte
06/22/2023, 8:00 PMroot
?Luke Dolan
06/22/2023, 11:34 PMNicholas Thompson
06/23/2023, 5:01 AMIan Smith
06/23/2023, 3:05 PMPREFECT_LOGGING_SETTINGS_PATH
env var doesn't seem sufficient. What's the best way to prevent prefect from making additional changes to the logging configuration when loaded?Emiliano
06/23/2023, 3:14 PMkiran
06/23/2023, 4:10 PMtyper.run
directly and it'll work for both. i.e.:
if __name__ == "__main__":
typer.run(process_file_flow)
Harry
06/23/2023, 8:11 PMFailed
Propagate Affinity merges / deletions is now in a Failed state
Message
Now the best I can get is:Copy codeAssertionError('Some items failed to merge')
<flow> in stateFailed
Flow run <flow>/fiery-hyrax observed in stateat 2023-06-23T190401.492402+00:00.Failed
Flow run URL: https://app.prefect.cloud/.../
State message: 1/1 states failed.from the config:
# Subject
{{ flow.name }} in state `{{ flow_run.state.name }}`
# Body
Flow run {{ flow.name }}/{{ flow_run.name }} observed in state `{{ flow_run.state.name }}` at {{ flow_run.state.timestamp }}.
Flow run URL: {{ flow_run|ui_url }}
State message: {{ flow_run.state.message }}
Is it possible to improve upon the "State message" part as this is not very informative? Or is this not available in the variables that can be accessed from this context. If that's the case how would we include this information if we built something to send a notification on failure ourselves?Michael Z
06/25/2023, 5:21 PM"level": {
"ge_": 20,
"le_": 50
},
Michael Z
06/25/2023, 9:02 PMTim Galvin
06/26/2023, 1:53 AMRoy Ben Dov
06/26/2023, 7:37 AM07:35:21.231 | ERROR | prefect.server.services.cancellationcleanup - Unexpected error in: TimeoutError()
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 78, in start
await self.run_once()
File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/server/services/cancellation_cleanup.py", line 48, in run_once
await self.clean_up_cancelled_subflow_runs(db)
File "/usr/local/lib/python3.10/site-packages/prefect/server/services/cancellation_cleanup.py", line 96, in clean_up_cancelled_subflow_runs
subflow_run_result = await session.execute(subflow_query)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 436, in execute
result = await greenlet_spawn(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2225, in execute
return self._execute_internal(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2120, in _execute_internal
result: Result[Any] = compile_state_cls.orm_execute_statement(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
result = conn.execute(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1412, in execute
return meth(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 483, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1635, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1844, in _execute_context
return self._exec_single_context(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1984, in _exec_single_context
self._handle_dbapi_exception( raise exc_info[1].with_traceback(exc_info[2])
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
self.dialect.do_execute(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 561, in execute
self._adapt_connection.await_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 540, in _prepare_and_execute
self._handle_exception(error)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 491, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 780, in _handle_exception
raise error
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 528, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncio.exceptions.TimeoutError
Spiden
06/26/2023, 7:58 AMsbrabez
06/26/2023, 8:50 AMprefect.filesystems
that use fsspec to point the configuration of my S3 bucket via the keyword argument settings=
like below:
from prefect import task, flow
from prefect.context import get_run_context
from prefect.filesystems import RemoteFileSystem
bucket_endpoint = "..."
access_key = "..."
secret_key = "..."
bucket_name = "s3-prefect-results-bucket"
s3 = RemoteFileSystem(basepath=f"s3://{bucket_name}",
settings={"client_kwargs": {"endpoint_url": bucket_endpoint}, "key": access_key, "secret": secret_key})
@task(persist_result=True)
def task_persisted(name):
ctx = get_run_context()
task_id = ctx.task_run.id
ret = f"Task id persisted {task_id} by {name}"
return ret
@flow(name="persist_flow", result_storage=s3)
def persist_flow(server_id: int):
result = task_persisted("sbrabez")
print(f"Result persisted: {result}")
return 0
But when I executed it, it reports the following errors. i.e. It seems it cannot PubObject
(S3 API) and the problem seems located in the aiobotocore
client side. Any ideas what’s wrong here or what I’m doing wrong? My S3 Credentials are OK here.
OSError: [Errno 22] Invalid Argument.
| flow_name:persist_flow | flow_run_name:important-dragonfly | flow_run_id:211feee3-4ab1-4b81-86ea-8e76856ed500 | level_name:ERROR | message:Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):
File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
return await func(*args, **kwargs)
File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.
The above exception was the direct cause of the following exception:
OSError: [Errno 22] Invalid Argument.\n')
Traceback (most recent call last):
File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
return await func(*args, **kwargs)
File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "persist.py", line 32, in <module>
persist_flow(server_id=19959)
File "/opt/prefect/lib/python3.8/site-packages/prefect/flows.py", line 468, in __call__
return enter_flow_run_engine_from_flow_call(
File "/opt/prefect/lib/python3.8/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
FYI, I’m using the Prefect version 2.9.0
Version: 2.9.0
API version: 0.8.4
Python version: 3.8.10
Git commit: 69f57bd5
Built: Thu, Mar 30, 2023 1:08 PM
OS/Arch: linux/x86_64
Profile: objectStorage
Server type: server
I’ll appreciate if you can point me any example or resource to configure this persistent result feature via S3. Thanks for your assistance, Cheers! 🤝Mike
06/26/2023, 10:19 AMOri Klipstein
06/26/2023, 11:02 AMconcurrent.futures.ProcessPoolExecutor
or multiprocessing.Pool
but get stuck on the ECS agent.
You could think of it as if the `get_pokemon_info`is a CPU bound command that takes 1 minute to run and therefore it makes sense to run these in parallel (not like the current get_pokemon_info
, which is i/o bound and is a pretty light API call and therefore makes sense to run concurrently using async).
So my question is how can I enable my own parallelism? (Great comment about Concurrency versus parallelism is in the Prefect docs)Benjamin Cabalona
06/26/2023, 1:10 PMSoami Charan
06/26/2023, 1:39 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 528, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncpg.exceptions.ActiveSQLTransactionError: ALTER TYPE ... ADD cannot run inside a transaction block
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 561, in execute
self._adapt_connection.await_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 540, in _prepare_and_execute
self._handle_exception(error)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 491, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 778, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.Error: <class 'asyncpg.exceptions.ActiveSQLTransactionError'>: ALTER TYPE ... ADD cannot run inside a transaction block
When I deployed prefect server, Its getting this error.
I am using Postgresql 13.3 version and prefect 2.10Nils
06/26/2023, 1:48 PMprefect.yaml
file for easier deployments. Both of my deployments use the same Docker image. Thus, the Docker image only needs to be build once. However, with my current implementation, the Docker image is build for both deployments. Is it possible to build the Docker image only once, while still deploying two flows?Justin Trautmann
06/26/2023, 1:55 PM