Oscar Björhn
11/01/2022, 12:53 PMPatrick Alves
11/01/2022, 1:45 PM❯ PREFECT_API_URL=<https://prefect.xxx.xxx/api> prefect deployment ls
I am getting:
File "/home/patrick/miniconda3/envs/prefect/lib/python3.8/ssl.py", line 944, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)
An exception occurred.
• I've tried set envs to disable certificate verification: PYTHONHTTPSVERIFY=false
• Tried to add the server certificate (*.xxx.xxx.crt) on the server copying the CRT file /usr/local/share/ca-certificates/
and update the CA store: sudo update-ca-certificates
Nothing works.
Any tip to solve this?Roger Webb
11/01/2022, 2:01 PMKalise Richmond
11/01/2022, 3:03 PMAusten Bouza
11/01/2022, 4:00 PMNoam Banay
11/01/2022, 4:26 PMJavier Ruere
11/01/2022, 5:46 PMJavier Ruere
11/01/2022, 5:47 PM<http://anyio.to|anyio.to>_thread
internally.
I'm using the ConcurrentTaskRunner.
The flow is ran like this:
anyio.run(
generate_report,
datetime.date.fromisoformat(run_date),
backend_options={"debug": True, "use_uvloop": True},
)
The problem I' having is that some Tasks finish but they remain in state Running forever. There's no output I could see to understand the problem.
Help, please.merlin
11/01/2022, 6:02 PMAwaitingRetry
?
@task
# some code, oh found out a blocker I'd like to wait for
time_2_hours_from_now= .... some date function...
if function_is_blocked:
return AwaitingRetry(scheduled_time=time_2_hours_from_now
Jon
11/01/2022, 6:43 PMwith prefect.Flow(
name="SOME_DYNAMIC_NAME" # instead of hardcoding this, i want to pass a value when registering / calling it
Carlo
11/01/2022, 6:48 PMVadym Dytyniak
11/01/2022, 7:28 PMJon
11/01/2022, 8:31 PMmerlin
11/01/2022, 9:04 PM@task
def fail_task():
return Failed()
@flow
def fail_flow_i_hope():
fail_task()
fail_flow_i_hope()
11:42:32.292 | DEBUG | Task run 'always_fail_task-d83fd752-0' - Beginning execution...
11:42:32.302 | INFO | Task run 'always_fail_task-d83fd752-0' - Finished in state Completed()
...
11:42:32.313 | INFO | Flow run 'ludicrous-lemur' - Finished in state Completed('All states completed.')
I'd like a task to fail, and then do retries (or not) but end the steps in the flow, with the flow also failed.Jon
11/01/2022, 9:53 PMTraceback (most recent call last):
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
└── 17:51:25 | ERROR | Unexpected error occured in FlowRunner: KeyError(0)
any help what this error means?Aaron
11/01/2022, 10:01 PMAhmed Ezzat
11/01/2022, 10:37 PMBen Muller
11/02/2022, 2:20 AMprefect-aws
deps installed.
I have an infrastructure ECSTask
block that pulls a custom image:
FROM prefecthq/prefect:2.6.5-python3.9
RUN pip install prefect-aws s3fs==2022.5.0
This installs everything I need to run a demo flow that is:
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE
from prefect_aws.ecs import ECSTask
ecs_task_block = ECSTask.load("staging-test")
@task
def log_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", "BEN")
<http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
logger.debug(AN_IMPORTED_MESSAGE)
@flow
def my_test_flow(name: str):
log_task(name)
if __name__ == "__main__":
my_test_flow()
This flow is using S3 Block Storage, now when I trigger this flow from the cloud UI the task is generated in my cluster, it just fails with a ModuleNotFoundError: No module named 'prefect_aws.ecs'
More details in 🧵iKeepo w
11/02/2022, 6:36 AMBen Muller
11/02/2022, 8:06 AMDenys Volokh
11/02/2022, 8:17 AMwonsun
11/02/2022, 8:36 AMFlowRunTask
to StartFlowRun
(reference link) among the methods written in that article to create the same situation. Below attached scripts, it is the case that two flows(flow_waveforms and flow_pipeline) that are executed well have already been created, and parent_flow is created to sequentially execute those flows.
# parent_flow.py
import prefect
from prefect.tasks.prefect.flow_run import StartFlowRun
with Flow('mother_flow') as flow:
first_job = StartFlowRun(flow_name='flow_waveforms',
project_name='InterFlow_Dependencies',
wait=True)
second_job = StartFlowRun(flow_name='flow_pipeline',
project_name='InterFlow_Dependencies',
wait=True)
first_job.set_downstream(second_job)
if __name__ == '__main__':
flow.register("InterFlow_Dependencies")
# flow_waveforms.py
import prefect
from prefect import Flow, task
@task
def task1():
something
return one, two
@task
def task2():
something
with Flow('flow_waveforms') as flow:
var1, var2 = task1()
finish = task2()
flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE':'thread'})
flow.register('InterFlow_Dependencies')
# flow_pipeline.py
import prefect
from prefect import Flow, task
@task
def task3():
something
with Flow('flow_pipelines') as flow:
task3()
flow.register('InterFlow_Dependencies')
If i excuted mother_flow.py in this case, prefect responds following error.(image)
Any idea where I went wrong? Or could you tell me if there is an easy way to define dependencies between flows other than this way?
Thx.pk13055
11/02/2022, 10:09 AMorion
+ agent
container setup locally, and everything is working as expected when I try to run the flows manually. I was able to successfully create and apply a deployment as well (from within the cli
container). However, when trying to run the deployment I am faced with a File not found error
.
Here's the traceback:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 879, in exec_module
File "<frozen importlib._bootstrap_external>", line 1016, in get_code
File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp_zx6qpglprefect/flows/fetch_etfs.py'
My docker-compose
is as follows:
version: "3.9"
services:
db:
image: timescale/timescaledb:latest-pg14
volumes:
- $PWD/data/db:/var/lib/postgresql/data
- $PWD/config/db:/docker-entrypoint-initdb.d/
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 5s
retries: 5
environment:
- POSTGRES_DB=$POSTGRES_DB
- POSTGRES_USER=$POSTGRES_USER
- POSTGRES_PASSWORD=$POSTGRES_PASSWORD
networks:
- db_network
orion:
image: prefecthq/prefect:2.6.3-python3.10
restart: always
volumes:
- $PWD/prefect:/root/.prefect
entrypoint: [ "prefect", "orion", "start" ]
environment:
- PREFECT_ORION_API_HOST=0.0.0.0
- PREFECT_ORION_DATABASE_CONNECTION_URL=$PREFECT_DB_URL
ports:
- 4200:4200
depends_on:
- db
networks:
- prefect_network
- db_network
agent:
build: ./prefect
restart: always
entrypoint: [ "prefect", "agent", "start", "-q", "main_queue" ]
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
cli:
build: ./prefect
entrypoint: "bash"
working_dir: "/root/flows"
volumes:
- "$PWD/prefect:/root"
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
networks:
db_network:
web_network:
prefect_network:
To create and apply the deployment(s):
> docker compose run cli
$ prefect deployment create ./flows/fetch_etf.py:flow_name --name flow_name
$ prefect deployment apply deployment.yaml
Anna Geller
John Kang
11/02/2022, 5:18 PMJiri Klein
11/02/2022, 5:52 PMConor Casey
11/02/2022, 6:25 PMAmir
11/02/2022, 6:45 PMJon
11/02/2022, 7:42 PMJon Ruhnke
11/02/2022, 8:46 PM