Dzmitry Aliashkevich
10/30/2022, 2:32 PMStephen Herron
10/30/2022, 4:51 PMAdam
10/30/2022, 9:55 PMmerlin
10/30/2022, 9:59 PM# i've left out all the imports and task definitions
# trino_flows.py
@flow(name="extract write")
def extract_write(config):
logger = get_run_logger()
<http://logger.info|logger.info>(f"extract file: {config.filepath}")
sql = load_sqlfile(config.filepath)
trino_cnxn = trino_connect()
data = send_query(trino_cnxn, sql)
write_output(data, config.outfile_path)
# file: extract_write.py
filepath = Path(sys.argv[1])
extract_config = ExtractConfig(filepath=filepath)
with tags(extract_config.dataset_name, "extract"):
extract_write(extract_config)
In development I'm calling the script with:
python src/extract_write.py src/extracts/weekly_date.sql
So the ExtractConfig object creates a dataset_name, rundate, and filepath field used by the flow code.
How do I build/apply a deployment when I'm passing an object to the flow function in my script?Deepanshu Aggarwal
10/31/2022, 6:05 AM{
"state": {
"type": "SCHEDULED",
"name": "string",
"message": "Run started",
"data": "string",
"state_details": {
"flow_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"task_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"child_flow_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"scheduled_time": "2022-10-31T06:00:35.234Z",
"cache_key": "string",
"cache_expiration": "2022-10-31T06:00:35.234Z",
"untrackable_result": false
},
"timestamp": "2022-10-31T06:00:35.234Z",
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6"
},
"name": "my-flow-run",
"parameters": {},
"context": {
"my_var": "my_val"
},
"infrastructure_document_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"empirical_policy": {
"retries": 0,
"retry_delay": 0
},
"tags": [
"tag-1",
"tag-2"
],
"idempotency_key": "string",
"parent_task_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6"
}
iKeepo w
10/31/2022, 6:24 AMStephen Lloyd
10/31/2022, 8:27 AMAlejandro
10/31/2022, 10:52 AMimport random
from collections import namedtuple
from datetime import date, datetime, time
import pandas as pd
from prefect import flow, task
WeatherConditions = namedtuple(
"WeatherConditions", ["wind_speed", "temperature", "rel_humidity"]
)
@task
def register_current_weather() -> WeatherConditions:
return WeatherConditions(
wind_speed=random.weibullvariate(3, 1.5),
temperature=random.uniform(-5, 25),
rel_humidity=random.uniform(0, 100),
)
@task
def upload_to_database(station_data: pd.DataFrame) -> None:
print("Updating weather database with the following data:")
print(station_data)
print("Observations were successfully recorded")
@flow
def surface_station_daily_weather(station: str, freq: str = "H") -> pd.DataFrame:
print(f"Daily weather observations for station {station.title()!r}")
timestamps = pd.date_range(
start=date.today(), end=datetime.combine(datetime.now(), time.max), freq=freq
)
observations = [register_current_weather() for _ in range(len(timestamps))]
return pd.DataFrame(data=observations, index=timestamps)
@flow
def weather_app(station_names: list[str]) -> None:
print("Welcome to the world's fastest weather data collection application!")
for station in station_names:
station_weather = surface_station_daily_weather(station=station, freq="3H")
upload_to_database(station_data=station_weather)
print(
"Daily observations have been updated for all operational stations. See you soon!"
)
if __name__ == "__main__":
STATIONS = [
"bilbao_station",
"oviedo_station",
"salamanca_station",
"badajoz_station",
]
weather_app(station_names=STATIONS)
I was wondering what is the recommended way to run subflows in parallel (not concurrently). In this case, the subflow surface_station_daily_weather
is executed sequentially (as far as I know there is no way to use the submit
mechanism with a flow). Is it advisable to use the multiprocessing library for this purpose? Or is there any built-in functionality for it?Stephen Herron
10/31/2022, 12:33 PMAdam
10/31/2022, 12:50 PMVadym Dytyniak
10/31/2022, 12:51 PMStéphan Taljaard
10/31/2022, 1:20 PMprefect.context
, notably the flow_name
, flow_run_name
, and flow_run_id
. Is there a way to pass the entire flow run context to a task? Or, is there a way to access the flow run context from within a task?
If I uncomment the middle line in my flow function (see in the thread), the flow run seems to become unresponsive (I guess it's because it's waiting for "this flow run"'s state, but the flow is still running...)Carlo
10/31/2022, 2:22 PMrun_deployment
w/ SequentialTaskRunner
. However when the first in a chain failed, it didn't block the remaining run_deployments
. In fact, they ran and the parent completed. How do I ensure the dependencies are honored? Flow definition in threadOscar Björhn
10/31/2022, 3:11 PMRahul Kadam
10/31/2022, 3:19 PMXavier Babu
10/31/2022, 3:37 PMTim Enders
10/31/2022, 3:45 PMTim Enders
10/31/2022, 3:59 PMTraceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1334, in report_task_run_crashes
yield
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1070, in begin_task_run
connect_error = await client.api_healthcheck()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/orion.py", line 204, in api_healthcheck
await self._client.get("/health")
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
await super().send(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 252, in handle_async_request
await self.response_closed(status)
asyncio.exceptions.CancelledError
10:56:53.688 | ERROR | Task run 'Get-Items-d8ed86f1-2473' - Crash detected! Execution was cancelled by the runtime environment.
10:56:53.688 | DEBUG | Task run 'Get-Items-d8ed86f1-2473' - Crash details:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 314, in acquire
self.acquire_nowait()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 342, in acquire_nowait
raise WouldBlock
anyio.WouldBlock
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/http2.py", line 96, in handle_async_request
await self._max_streams_semaphore.acquire()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_synchronization.py", line 46, in acquire
await self._semaphore.acquire()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 319, in acquire
await event.wait()
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
Nic
10/31/2022, 4:28 PM--params='{"question": "ultimate", "answer": 42}'
Returns
+- Error ---------------------------------------------------------------------+
| Got unexpected extra arguments (ultimate, answer: 42}') |
+-----------------------------------------------------------------------------+
The --param is working, but I've tried many different combinations of --params without success.
Is it possible to provide a working example or see if anything has changed since it's not working?Brian Phillips
10/31/2022, 4:57 PMeddy davies
10/31/2022, 4:58 PMconfig
file in my .kube
file but doesn't seem to work, any advice?Meghan Franklin
10/31/2022, 7:04 PMimport matplotlib; matplotlib.use('agg')
I get a segfault like this:
./run_local.sh: line 14: 34109 Segmentation fault: 11 python cli.py local -pfile developer/test_input.json
make: *** [run_local] Error 139
(p39_ngs) MB-MFRANKLIN:amplicon-analysis meghanfranklin$ /Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
but if I don’t use that, python crashes 🙃
/Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/site-packages/amp_analysis/plots.py:425: UserWarning: Starting a Matplotlib GUI outside of the main thread will likely fail.
fig, ax = plt.subplots()
[2022-10-31 15:02:54-0400] INFO - prefect.TaskRunner | Task 'create_pipeline_run': Finished task run for task with final state: 'Success'
2022-10-31 15:02:54.121 python[34449:328037] *** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: 'NSWindow drag regions should only be invalidated on the Main Thread!'
*** First throw call stack:
(
0 CoreFoundation 0x00007ff80bf1e7c3 __exceptionPreprocess + 242
1 libobjc.A.dylib 0x00007ff80bc7ebc3 objc_exception_throw + 48
2 CoreFoundation 0x00007ff80bf47076 -[NSException raise] + 9
[a lot more lines here]
)
libc++abi: terminating with uncaught exception of type NSException
./run_local.sh: line 14: 34449 Abort trap: 6 python cli.py local -pfile developer/test_input.json
make: *** [run_local] Error 134
(p39_ngs) MB-MFRANKLIN:amplicon-analysis meghanfranklin$ /Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
-> cue Mac popup window about sending crash report to apple
David Cupp
10/31/2022, 7:54 PMBen Muller
10/31/2022, 10:33 PMprefect run -p test_flow.py
Is this an option in 2.0 on the cli? or do I have to initate through python every time?Ben Muller
10/31/2022, 11:31 PMSequentialTaskRunner
with .submit
.
Would this just be the same as not specifying a task_runner
and just running the tasks without .submit
or am I missing something?Tony Yun
11/01/2022, 2:46 AMTim Galvin
11/01/2022, 3:01 AMdask_jobqeue.SLURMCluster
) with a DaskTaskExecutor
being used, with 10 separate compute nodes being pulled into the distributed dask schedular.
• A single large Flow with ~7 tasks, with each task calling a separate python script's main
I am just running the postgres database remotely. I have not set an prefect orion
server running remotely - although I can try this.
I found that regular running of the pipeline would often raise a TimeoutError
in an unpredictable manner. Setting ``PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT=20` eliminated these. Now my problems seem to be TimeoutErrors
when the flow is closing. I can confirm that all expected data products expected by the pipeline have been created and stored on disk, and all log messages have been issued. The flow is essentially finished executing and wrapping up and then the error is raised. The traceback is too long to post as a comment ( 😢 ) , so it is below.
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
raise error
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
await self._transaction.start()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
await self._connection.execute(query)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError
Ben Muller
11/01/2022, 6:09 AMECSTask
that I have successfully deployed :
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE
from prefect_aws.ecs import ECSTask
ecs_task_block = ECSTask.load("staging-test")
@task
def log_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", name)
<http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
logger.debug(AN_IMPORTED_MESSAGE)
@flow()
def log_flow(name: str):
log_task(name)
if __name__ == "__main__":
name = sys.argv[1]
log_flow(name)
Our org uses 1.0 at present and we have never had to pass AWS Credentials - we dont use credentials like this as we usually use roles that you can assume. I believe this was the case in Prefect 1.0 if I remember correctly. All my agent has to do is assume a role ( and it has access to whatever I need it to with its task role that I set up in deployment ).
So with these blocks all requiring credentials, I am wondering if these are optional and would be picked up by the default AWS environment variables if I leave them blank or I will need to configure some type of user access for a prefect machine user?
Secondly: I am trying a deployment with storage - but it is coming up with an error:
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
storage_block = S3(bucket_path="prefect-2-test")
deployment = Deployment.build_from_flow(
flow=log_flow,
name="log-simple",
parameters={"name": "Marvin"},
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name="staging",
storage=S3.load("staging-test-block"),
)
if __name__ == "__main__":
storage_block.save("staging-test-block", overwrite=True)
deployment.apply()
ERROR:
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
Any ideas to help me overcome this?eddy davies
11/01/2022, 10:39 AMaws-iam-authenticator
, any ideas?Michael Hadorn
11/01/2022, 10:58 AMtrigger=always_run, skip_on_upstream_skip=False
, but currently we have to abort the full flow run. So this task will never be executed.