Deepanshu Aggarwal
12/06/2022, 1:52 PMfrom prefect import task, flow
from prefect.deployments import run_deployment
@task()
def task_A(event):
output = run_deployment(
name='some deployment',
parameters={'event': event}
)
if output.state_name != 'Completed':
raise Exception("flow run not completed")
return output
@flow()
def flow_B():
a = [1, 2, 3, 4, 5, 6, 7]
# method 1
for i in a:
task_A.submit(i)
# method 2
output = task_A.map(a)
for tasks in output:
tasks.wait()
#do something when all the parallel executions have completed for task A
this is what my flow looks like but in method 1 it runs next iteration of task_A only when previous iteration is complete
in method2 it doesnt wait for the tasks to completePatrick Tan
12/06/2022, 3:24 PMMarc Lipoff
12/06/2022, 3:57 PMAlexandru Anghel
12/06/2022, 4:08 PMdistributed.scheduler.KilledWorker: Attempted to run task prometheus_to_gcs-02d33e7b-92a6-4fe8-8258-ba9efaa3d609 on 3 different workers, but all those workers died while running it.
What could be the problem here? Other smaller datasets are running fine.
I am running it with adaptive workers (min 4, max 10). I am not sure though if I used correctly to dask configuration for what i am trying to do. Can you please have a look at my code in the thread?
I am able to run the same flow in Prefect 1.4.
Thanks!Prem Viswanathan
12/06/2022, 4:10 PMBob Colner
12/06/2022, 4:54 PMJoseph Loss
12/06/2022, 5:19 PMXavier Babu
12/06/2022, 11:07 PMZac Hooper
12/06/2022, 11:35 PM@task
def prod_9e_etl_asx_anns():
logger = get_run_logger()
res = invoke_lambda("prod_9e_etl_asx_anns", {}, True)
<http://logger.info|logger.info>(res)
return ""
@task
def prod_9e_etl_asx_ann_sentiment():
logger = get_run_logger()
res = invoke_lambda("prod_9e_etl_asx_ann_sentiment", {}, True)
<http://logger.info|logger.info>(res)
return ""
@flow(task_runner=SequentialTaskRunner(), retries=2)
def perfom_asx_anns_etl():
x = prod_9e_etl_asx_anns.submit(return_state=True)
y = prod_9e_etl_asx_ann_sentiment.submit(wait_for=[x])
I need to make sure the tasks happen in sequential order. Any idea how I prevent this error?Jarvis Stubblefield
12/07/2022, 12:26 AMprefect agent -q tenzinga_django_prod
on my server … I’m using a systemd
to run. It errors and lets me know that Orion (the server not agent) needs SQLite >= 3.24.0 but Amazon Linux 2 only has 3.7.17 as it’s latest version.Slackbot
12/07/2022, 3:14 AMSanthosh Solomon (Fluffy)
12/07/2022, 6:20 AMSanthosh Solomon (Fluffy)
12/07/2022, 6:21 AM陳柏翰
12/07/2022, 7:35 AMNic
12/07/2022, 11:03 AMArnoldas Bankauskas
12/07/2022, 12:17 PMLiz McCutcheon
12/07/2022, 12:34 PMJoão Coelho
12/07/2022, 1:33 PMZachary Loertscher
12/07/2022, 4:22 PMpyodbc
However, I continue to get the error Failed to load and execute flow run: ModuleNotFoundError("No module named 'pyodbc'")
I have..
• added pyodbc
to requirements.txt, which is installed using our Dockerfile
with RUN pip install -r requirements.txt
Info:
Prefect version: 1.2.4
Deployment: AWS CDK for AWS
Where does your docker container live?: an ECS Fargate deployment.
What OS runs on your docker container? Amazon Linux 2
Any ideas? Is Prefect just not finding the package?Khuyen Tran
12/07/2022, 4:30 PMAdam Roderick
12/07/2022, 5:08 PMfor doc in query:
, but it appears to load the entire resultset into memory before enumerating the rows.
The better practice is to use
doc = query.fetchone()
while doc:
doc = query.fetchone()
However, this hangs immediately when run from within a task.
Has anyone else seen this behavior? Any idea why it might be behaving this way?David Anderson
12/07/2022, 5:09 PMTask 'AirbyteConnectionTask': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.7/site-packages/requests/models.py", line 971, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/lib64/python3.7/site-packages/simplejson/__init__.py", line 488, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.7/site-packages/simplejson/decoder.py", line 374, in decode
obj, end = self.raw_decode(s)
File "/usr/lib64/python3.7/site-packages/simplejson/decoder.py", line 393, in raw_decode
return self.scan_once(s, idx=_w(s, idx).end())
simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 67, in _check_health_status
self.logger.debug("Health check response: %s", response.json())
File "/home/ec2-user/.local/lib/python3.7/site-packages/requests/models.py", line 975, in json
raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
requests.exceptions.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 884, in get_task_run_state
logger=self.logger,
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 333, in run
session = airbyte._establish_session()
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 60, in _establish_session
if self._check_health_status(session):
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 76, in _check_health_status
raise AirbyteServerNotHealthyException(e)
prefect.tasks.airbyte.airbyte.AirbyteServerNotHealthyException: Expecting value: line 1 column 1 (char 0)
Jarvis Stubblefield
12/07/2022, 6:01 PMunhealthy
… I went back and ensured that my starting of the agent in production was using the correct queue. It appears to be spelled correctly and everything. However, the unhealthy
bit has never gone away and it doesn’t have a last polled date
.Yoanis Gil
12/07/2022, 6:05 PMJared Robbins
12/07/2022, 6:14 PMSlackbot
12/07/2022, 6:26 PMSamuel Kohlleffel
12/07/2022, 6:29 PMTomas Moreno
12/07/2022, 6:34 PMBraun Reyes
12/07/2022, 7:04 PMBradley Hurley
12/07/2022, 7:16 PMStorageSchema
. Is there a recommended approach to ensuring my custom StorageSchema is globally accessible in the StorageSchema.type_schemas
? As a workaround we are calling StorageSchema.type_schemas["MyCustomStorageSchema"] = MyCustomStorageSchema
in our CLI wrapper before creating agents so that the custom schema is accessible, but when we create child flows via Dask workers the custom storage is not set and the flow fails.