Zachary Loertscher
05/05/2023, 7:09 PMtask.with_options().map()
For example I have:
@task(
name=f'{dev_stage} - Airbyte - Task'
)
def run_airbyte_sync(
connection: AirbyteConnection,
connection_name: str
):
print(f"kicking off task for connection name: '{connection_name}'")
job_run = connection.trigger()
job_run.wait_for_completion()
return job_run.fetch_result()
@flow(
name=f'{dev_stage} - Airbyte',
log_prints=True,
task_runner=DaskTaskRunner( #allows for parallel execution
cluster_kwargs={
"processes": False, # use threads instead of processes
"n_workers": 8,
"threads_per_worker": 1 # number of threads per worker
}
)
)
def flow_airbyte():
#run airbyte sync
airbyte_results = run_airbyte_sync.map(
airbyte_connections,
airbyte_connections_dict.values()
)
Any help is appreciated!
it would be super nice to do this because I would get observability into the names of the connections I'm running in Airbyte - i.e. if one task fails, I quickly know which one it was in airbyteAndy Dyer
05/05/2023, 7:57 PMAndy Dyer
05/05/2023, 8:03 PMfrom prefect import Flow, flatten
files = preprocess_csv.map(flatten(list_of_csvs))
used to be this ^Manoj
05/05/2023, 8:21 PM<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
Tom
05/05/2023, 8:48 PMDominic Tarro
05/06/2023, 12:04 PMOmair Nabiel
05/06/2023, 5:22 PMCMD prefect server start
from my docker file after installing pip install prefect
within the image. The docker logs show that prefect is running fine on 127.0.0.1:4200
, and I have port mapped to 4200
on my host machine as well, but localhost:4200
fails to load. I have a selenium container running on port 4444
which runs just fine and is accessible on my host machine. Any ideas? Thank youLucas Zago
05/06/2023, 7:55 PMKaibo Chen
05/07/2023, 9:02 PMimport job_wrapper
job_list = ["job_a","job_b",...]
for job in job_list:
@flow(name=job)
def run_job():
job_wrapper(job)
However, because that current deployment requires a unique entry_point function as well flow_name, now what i have is
import job_wrapper
@flow(name="job_a")
def run_job_a():
job_wrapper("job_a")
@flow(name="job_b")
def run_job_b():
job_wrapper("job_b")
...
Is there a less repetitive way of doing this that I might have missedOfir
05/07/2023, 9:47 PMFarid
05/07/2023, 11:39 PMSuccessfully installed Babel-2.12.1 .... It is recommended to use a virtual environment instead: <https://pip.pypa.io/warnings/venv>
[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: pip install --upgrade pip
/usr/local/lib/python3.9/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
01:14:11.382 | INFO | Flow run 'spiffy-platypus' - Downloading flow code from storage at 'snowflake_dbt_transformations'
01:14:16.788 | INFO | prefect.engine - Engine execution of flow run 'a8432976-2131-441a-8d98-01740500ba8e' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
Has anyone else encountered a similar issue? Any idea what might be causing this?
I would appreciate any help or insights on this. Thank you in advance!Giacomo Chiarella
05/08/2023, 11:09 AMDeceivious
05/08/2023, 1:13 PMdef get_storage_parameters() -> Dict:
return {
"persist_result": True,
"result_storage": where_to_store(),
# where_to_store function returns where to store
# e.g for dev env, we store the file in temporary local fs
# for prod maybe s3 buckets
"result_serializer": JSONSerializer(jsonlib="json"),
}
I originally intended to use it as
@task(**get_storage_parameters())
My deployment environment [github workflows] does not actually need the credentials to the s3 buckets hence the credentials are not available to github.
How would I bind the parameters dynamically only on the run time and not the deployment time? I know that the with_options
could be used but I would have to implement the method on every usage of each and every task / also the code is really dirty with implementing with_options
everywhere.
How is the community handling secrets during deployment?Tomas Moreno
05/08/2023, 2:15 PMTim-Oliver
05/08/2023, 4:07 PMAbhinav Chordia
05/08/2023, 4:11 PMscott
05/08/2023, 4:36 PMCancelling
state, and tasks still running after flow has entered Cancelled
state - see threadJanne Keskitalo
05/08/2023, 6:37 PMdocker run -it --rm prefecthq/prefect:2.10.8-python3.10 prefect server start
Error: sqlite3.OperationalError: table _alembic_tmp_artifact already exists
Chandan Maruthi
05/08/2023, 6:46 PMprefect agent start --pool default-agent-pool --work-queue default
I am curious what you all recommend on running prefect agents. I ran a Prefect agent as a script on an ec2 container but I it stopped after a while. I would want the agent to always be running so I know when my UI submits a job, there is someone that will execute those jobs.
Please share any best practices to run agents reliablyMalavika S Menon
05/08/2023, 7:40 PMprefect server start
command, this error is popping up. How can I fix this without having to clear or use a new database?
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 442, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncpg.exceptions.CardinalityViolationError: more than one row returned by a subquery used as an expression
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
self._handle_exception(error)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.Error: <class 'asyncpg.exceptions.CardinalityViolationError'>: more than one row returned by a subquery used as an expression
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
self._handle_exception(error)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.CardinalityViolationError'>: more than one row returned by a subquery used as an expression
[SQL:
UPDATE task_run_state
SET result_artifact_id = (SELECT id FROM artifact WHERE task_run_state.id = task_run_state_id)
WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE (has_data IS TRUE) AND (result_artifact_id IS NULL) LIMIT 500);
]
(Background on this error at: <https://sqlalche.me/e/14/dbapi>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/starlette/datastructures.py", line 702, in __getattr__
return self._state[key]
KeyError: 'services'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/starlette/routing.py", line 677, in lifespan
async with self.lifespan_context(app) as maybe_state:
File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
return await self.gen.__anext__()
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/prefect/server/api/server.py", line 486, in lifespan
await stop_services()
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/prefect/server/api/server.py", line 468, in stop_services
if app.state.services:
File "/home/malavika/.virtualenvs/healthgraph/lib/python3.8/site-packages/starlette/datastructures.py", line 705, in __getattr__
raise AttributeError(message.format(self.__class__.__name__, key))
AttributeError: 'State' object has no attribute 'services'
01:04:07.255 | ERROR | uvicorn.error - Application startup failed. Exiting.
Nicholas Torba
05/08/2023, 8:14 PMAbhinav Chordia
05/08/2023, 8:31 PMOfir
05/08/2023, 9:10 PMgit clone git@github.com:PrefectHQ/prefect.git
cd prefect
git checkout 2.10.6
# edit Dockerfile to set `PREFECT_UI_SERVE_BASE=/prefect`
docker build --arg PYTHON_VERSION=3.10 -t prefect:base2.10.6-python3.10 -f Dockerfile .
Then you need to inherit from this image for building the Prefect agent:
FROM prefect:base2.10.6-python3.10
RUN pip install -r requirements.txt
...
@justabill I think the Prefect dev team should probably introduce Docker arguments such that it is easier
to achieve that without hacking the base image.
I can elaborate more here or in DM.Ofir
05/08/2023, 10:25 PMdocker
and kubectl
verbs. e.g. prefect config view
is very similar to the concept of Kubernetes cluster settings. And prefect deployment inspect <deployment>
is similar to docker inspect
, etc.Ofir
05/08/2023, 10:25 PMOfir
05/08/2023, 10:26 PMAG Crum
05/08/2023, 11:07 PMEmil Ordoñez
05/09/2023, 12:25 AMFailed the last 3 attempts. Please check your environment and configuration.
Examples of recent errors:
Traceback (most recent call last):
" File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 224, in "
process_input
" func, target_state = self._transitions[(self.state, input_)]"
"KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)"
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
" File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"116, in handle_async_request"
" await self._send_request_headers(request=request, stream_id=stream_id)"
" File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"213, in _send_request_headers"
" self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)"
" File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 766, in "
send_headers
self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
" File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 228, in "
process_input
raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in
state ConnectionState.CLOSED
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
" File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 60, in map_httpcore_exceptions"
yield
" File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 353, in handle_async_request"
resp = await self._pool.handle_async_request(req)
File
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 253, in handle_async_request"
raise exc
File
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 237, in handle_async_request"
response = await connection.handle_async_request(request)
" File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py"", "
"line 90, in handle_async_request"
return await self._connection.handle_async_request(request)
" File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"152, in handle_async_request"
raise LocalProtocolError(exc) # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in
state ConnectionState.CLOSED
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
" File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", "
"line 46, in critical_service_loop"
await workload()
" File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 261, in "
check_for_cancelled_flow_runs
async for work_queue in self.get_work_queues():
" File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 144, in "
get_work_queues
work_queue = await self.client.read_work_queue_by_name(
File
"""/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py"", line "
"850, in read_work_queue_by_name"
" response = await self._client.get(f""/work_queues/name/{name}"")"
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1754, in"
get
return await self.request(
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1530, in"
request
" return await self.send(request, auth=auth, "
follow_redirects=follow_redirects)
" File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"251, in send"
response = await self._send_with_retry(
" File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"194, in _send_with_retry"
response = await request()
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1617, in"
send
response = await self._send_handling_auth(
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1645, in"
_send_handling_auth
response = await self._send_handling_redirects(
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1682, in"
_send_handling_redirects
response = await self._send_single_request(request)
" File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1719, in"
_send_single_request
response = await transport.handle_async_request(request)
" File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 352, in handle_async_request"
with map_httpcore_exceptions():
" File ""/usr/local/lib/python3.10/contextlib.py"", line 153, in __exit__"
" self.gen.throw(typ, value, traceback)"
" File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 77, in map_httpcore_exceptions"
raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state
ConnectionState.CLOSED
Traceback (most recent call last):
" File ""/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py"", line 41, in wrapper"
" return fn(*args, **kwargs)"
" File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py"", line 260, in coroutine_wrapper"
return call()
" File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 245, in __call__"
return self.result()
" File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 173, in result"
return self.future.result(timeout=timeout)
" File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 451, in result"
return self.__get_result()
" File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 403, in __get_result"
raise self._exception
" File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 218, in _run_async"
result = await coro
" File ""/usr/local/lib/python3.10/site-packages/prefect/cli/agent.py"", line 189, in start"
async with anyio.create_task_group() as tg:
" File ""/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py"", line 662, in __aexit__"
raise exceptions[0]
" File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", line 104, in critical_service_loop"
" raise RuntimeError(""Service exceeded error threshold."")"
RuntimeError: Service exceeded error threshold.
An exception occurred.
I'm running the Agent as an ECS Service on Fargate, so it's always starting again but with unexpected random behavior, sometimes the flows keep frozen after this issue.
I think this is happening after I updated to the 2.10.6 version of prefect. Before the update, the agent was not showing that Stopping behavior.
Has anyone experienced something similar?
I see in this thread some kind of similar error, but I'm not sure if doing the prefect config set PREFECT_API_ENABLE_HTTP2=False
thing didn't actually solve that issue and I'm also not sure how to specify that on the ECS Task Definition? Should I add this config as an Environment Variable?Matt Alhonte
05/09/2023, 1:26 AMECSRun
run_config
and turn it into one of the new ECSTask
blocks? Looks like the arguments got rearranged a lot.Nimesh Kumar
05/09/2023, 6:04 AM