Nash Taylor
09/11/2024, 11:39 PMcnsmyth
09/17/2024, 12:51 AM@task
in front of a class function where self
is a parameter throws off the parameter passing? As in..
class MyClass:
def __init__():
pass
@task
def my_func(self, arg1, arg2):
pass
class_obj = MyClass()
class_obj('arg1', 'arg2')
Will throw an error where my_func is expecting arg2?Nash Taylor
09/17/2024, 1:40 AMFlow run infrastructure exited with non-zero status code:
Exited with non 0 code. (Error Code: 1)
This may be caused by attempting to run an image with a misspecified platform or architecture.
I've been following this guide (https://docs.prefect.io/3.0/deploy/infrastructure-examples/serverless) and the prefect work-pool create
step went fine. What am I missing here?Victor Rosario Núñez
09/18/2024, 8:27 AMBogdan Posa
09/19/2024, 8:05 AMcnsmyth
09/19/2024, 7:34 PMprefect-email.email_send_message
does not send anything (for me at least)? It doesn't throw an error, the task is run successfully, but no emails are sent. Running email_server_credentials = EmailServerCredentials.load("prefect-email-credentials")
yields the correct results. Anyone have suggestions on how to debug?cnsmyth
09/19/2024, 7:39 PMRuntimeWarning: coroutine 'run_task_async' was never awaited
return fn(*args, **kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Probably related?cnsmyth
09/19/2024, 8:53 PMyagmail
but if anyone has input, I would still love to hear it!Jainish Savalia
09/20/2024, 12:15 PMNicolay
09/20/2024, 3:27 PMDaniil
09/21/2024, 4:42 PMNikolay Tsvetanov
09/25/2024, 2:03 PMbuild_from_flow
to deploy a flow by passing entrypoint
and `path`:
build_deployment = await Deployment.build_from_flow(
flow=hello_world_flow,
name=name,
...
entrypoint=entrypoint,
path=path)
Im starting now on 3.0 but now build_from_flow is missing:
prefect.exceptions.PrefectImportError: `prefect.deployments:Deployment` has been removed. Use `flow.serve()`, `flow.deploy()`, or `prefect deploy` instead.
Docs mention this args but it is unclear with no examples.
Does someone know how to pass entrypoint & path when creating deployment from a flow?Mark
09/26/2024, 2:35 PMricardo
09/26/2024, 5:49 PMBrock
09/27/2024, 5:31 PMMaroun Khriesh
09/29/2024, 9:04 AM@flow(log_prints=True, task_runner=ConcurrentTaskRunner)
def sample_flow(sample_index) -> None:
...
def samples_parallel_executer(sample_indices):
for sample_index in sample_indices: #TODO: Add parallelism
sample_flow(sample_index)
However, it wasn't obvious for me how to parallelize flows execution in Prefect. Any idea what is the best practices?
Thanks in advance for your help!Nikolay Tsvetanov
09/30/2024, 11:29 AMDemian Licht
09/30/2024, 1:44 PMДмитро Булах
10/02/2024, 1:22 PMprefect profile use name
? Ideally something like
set_profile('name_1')
run_my_flow()
set_active_profile_back();
Wolfgang Wenzel
10/03/2024, 9:36 PMMark
10/04/2024, 4:16 PMMichael Bøcker-Larsen
10/07/2024, 8:05 AMKeyError: "No class found for dispatch key 'github-repository' in registry for type 'Block'."
The same happens if the worker is a process (I think the reason here is that the process is not running within my virtual env).
But not when running the flow in a non-worker.
from prefect import flow
from prefect_github.repository import GitHubRepository
flow.from_source(
source=GitHubRepository.load("app-backend-data"),
entrypoint="flows/provider/process_provider.py:process_provider",
).deploy(name="process-provider", work_pool_name="docker-worker")
My worker is started like this
prefect worker start --pool "docker-worker" --type docker
So my source is code from GitHub
The worker is docker
But I haven't specified an image on the deployment (should I?)An Vu Trong
10/07/2024, 2:08 PMimport asyncio
from prefect import flow
from prefect.logging import get_logger
from src.db.mongodb_connection import (
mongodb_fintech_conn,
mongodb_staging_conn,
)
from src.utils.http_client import WifeedAPIClient, JsonFetcher
from src.workflows.wifeed_data_tasks import DataTask
class DataPipeline:
def __init__(self):
self.logger = get_logger()
self.conn = mongodb_staging_conn
self.client = WifeedAPIClient(json_file="json/wifeed_urls.json")
self.fetcher = JsonFetcher(self.client)
self.data_task = DataTask(self.client, self.fetcher)
@flow
async def run_staging(self, loaidn: str="", san: str=""):
await self.data_task.fetch_raw_stock_list(self.conn, loaidn, san)
if __name__ == "__main__":
pipeline = DataPipeline()
pipeline.run_staging.deploy(
name="my-deployment",
work_pool_name="docker-wp",
image="my-docker-image:dev",
push=False
)
when I'm run:
prefect deployment run 'run-staging/my-deployment'
it say:
Error creating flow run: Validation failed. Failure reason: 'self' is a required property
The worker is docker type. How should I fix this? thanks youDavid Michael Gang
10/07/2024, 6:11 PMPaweł Biernat
10/08/2024, 8:26 PMtask.serve()
functionality, let's say task_a and task_b. From what I gathered from the docs, to execute a task I have to
from module_a import task_a
result_a = task_a.delayed(1)
and the same with task_b
. Now, if I want to run an equivalent of result = task_b(task_a)
, I'd have to import both tasks and run
from module_a import task_a
from module_b import task_b
result_a = task_a.delay(1).wait().result() # not sure about the wait().result()
result_b = task_b.delay(result_a).wait().result()
But what if I can't import both modules (e.g. due to dependency conflicts, or different runtime environments)? What's the intended pattern to connect two relatively independent tasks?
With the flows this is easy, because I can write a parent flow that internally runs run_deployment("flow_a")
and then retrieve the result and pass it to run_deployment("flow_b")
. The only import I have to do is the prefect itself, the parent flow doesn't care about implementation details of the subflows, once they are served/deployed.Isaac
10/08/2024, 10:17 PMdef cache_results_within_flow_run(
context: TaskRunContext, parameters: dict[str, Any]
) -> str:
"""Caches a task result within the context of the flow it is run in."""
return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"
@task(
name="example",
tags=["pipelines"],
version=get_version(),
retries=2,
retry_delay_seconds=exponential_backoff(backoff_factor=60),
retry_jitter_factor=0.5,
on_failure=[alert_slack_on_task_failure],
cache_key_fn=cache_results_within_flow_run,
)
def trademark_etl() -> None:
"""Task for running the earnings calls etl Prefect deployment."""
deployment_name = "example-flow/example-deployment"
run_prefect_deployment_check_successful(deployment_name=deployment_name)
We have been overhauling our orchestration and aren't seeing the expected behavior for caching. Most likely we are doing something incorrectly but not sure what. Our goal is to cache task results in the context of the flow they were run in, so that if the flow fails due to any of its tasks failing, we can retry the flow, and only the tasks that have not run successfully (in the flow being retried) will be run. I implemented a caching function that attempts to do this, however, this morning when one of our tasks failed and I went to retry the flow, each task started running as normal, without regard to having already completed in the same flow. Could it be that this is happening because we are not returning anything from our tasks?An Vu Trong
10/09/2024, 5:46 AMfrom prefect import flow
from src.db.mongodb_connection import (
mongodb_fintech_conn,
mongodb_staging_conn,
)
from src.services.data_pipeline import DataPipeline
pipeline = DataPipeline(
staging_connection=mongodb_staging_conn,
fintechdb_connection=mongodb_fintech_conn,
)
@flow
async def run_staging():
await pipeline.raw_staging()
and this is my prefect.yamlbuild
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: prefectdock
tag: dev
dockerfile: auto
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/fintech_api
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: raw_storage
tags: [pipeline]
description: Fetch and store data from Wifeed API to MongoDB
schedule:
- interval: 30
flow_name: run_staging
entrypoint: src/services/flow_deployment.py:run_staging
parameters: {}
work_pool:
name: docker-wp
work_queue_name: primary-queue
job_variables:
image: '{{ build_image.image }}'
# Run settings to remove containers after each run
run:
remove_containers: true
the deployment log is okay:
prefect deploy --all
12:43:49.953 | INFO | Task run '_load_wifeed_urls' - Created task run '_load_wifeed_urls' for task '_load_wifeed_urls'
12:43:49.975 | INFO | Task run '_load_wifeed_urls' - Finished in state Completed()
? Would you like to configure schedules for this deployment? [y/n] (y): n
Running deployment build steps...
> Running build_docker_image step...
Step 1/4 : FROM prefecthq/prefect:3.0.4-python3.12
---> 23f4d5c00a91
Step 2/4 : COPY . /opt/prefect/fintech_api/
---> 2fea9c1149ee
Step 3/4 : WORKDIR /opt/prefect/fintech_api/
---> Running in 0f2eb12d6af8
---> c3ac3b50b5ea
Step 4/4 : LABEL io.prefect.version=3.0.4
---> Running in 7c1849b0f268
---> f71d6cdb75a1
Successfully built f71d6cdb75a1
│ Deployment 'run-staging/raw_storage' successfully created with id '7c1f3b71-4338-428b-8c31-6be0d6dd1b67'.
View Deployment in UI: <http://127.0.0.1:4200/deployments/deployment/7c1f3b71-4338-428b-8c31-6be0d6dd1b67>
? Would you like to save configuration for this deployment for faster deployments in the future? [y/n]: n
To execute flow runs from this deployment, start a worker in a separate terminal that pulls work from the 'docker-wp' work pool:
$ prefect worker start --pool 'docker-wp'
To schedule a run for this deployment, use the following command:
$ prefect deployment run 'run-staging/raw_storage'
But in the run, it say error:
2024-10-09 12:44:50 +-+---------------- 1 ----------------
2024-10-09 12:44:50 | Traceback (most recent call last):
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
2024-10-09 12:44:50 | yield
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
2024-10-09 12:44:50 | resp = await self._pool.handle_async_request(req)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
2024-10-09 12:44:50 | raise exc from None
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
2024-10-09 12:44:50 | response = await connection.handle_async_request(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
2024-10-09 12:44:50 | raise exc
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
2024-10-09 12:44:50 | stream = await self._connect(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 122, in _connect
2024-10-09 12:44:50 | stream = await self._network_backend.connect_tcp(**kwargs)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
2024-10-09 12:44:50 | return await self._backend.connect_tcp(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/anyio.py", line 115, in connect_tcp
2024-10-09 12:44:50 | with map_exceptions(exc_map):
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50 | self.gen.throw(value)
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
2024-10-09 12:44:50 | raise to_exc(exc) from exc
2024-10-09 12:44:50 | httpcore.ConnectError: All connection attempts failed
2024-10-09 12:44:50 |
2024-10-09 12:44:50 | The above exception was the direct cause of the following exception:
2024-10-09 12:44:50 |
2024-10-09 12:44:50 | Traceback (most recent call last):
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 495, in execute_flow_run
2024-10-09 12:44:50 | flow_run = await self._client.read_flow_run(flow_run_id)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration.py", line 2033, in read_flow_run
2024-10-09 12:44:50 | response = await self._client.get(f"/flow_runs/{flow_run_id}")
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1814, in get
2024-10-09 12:44:50 | return await self.request(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1585, in request
2024-10-09 12:44:50 | return await self.send(request, auth=auth, follow_redirects=follow_redirects)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 330, in send
2024-10-09 12:44:50 | response = await self._send_with_retry(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 254, in _send_with_retry
2024-10-09 12:44:50 | response = await send(request, *send_args, **send_kwargs)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1674, in send
2024-10-09 12:44:50 | response = await self._send_handling_auth(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
2024-10-09 12:44:50 | response = await self._send_handling_redirects(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
2024-10-09 12:44:50 | response = await self._send_single_request(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1776, in _send_single_request
2024-10-09 12:44:50 | response = await transport.handle_async_request(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 376, in handle_async_request
2024-10-09 12:44:50 | with map_httpcore_exceptions():
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50 | self.gen.throw(value)
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
2024-10-09 12:44:50 | raise mapped_exc(message) from exc
2024-10-09 12:44:50 | httpx.ConnectError: All connection attempts failed
2024-10-09 12:44:50 +------------------------------------
2024-10-09 12:44:50 An exception occurred.
In the log of prefect: Reported flow run '53db904b-e99d-4d05-8e61-cba25875b257' as crashed: Flow run infrastructure exited with non-zero status code 1.
---
Also, The containters that generated from each run did not remove by itself. Thanks you if you can help meNikolay Tsvetanov
10/09/2024, 8:10 AMroot@90cbbd619a73:/app# prefect config set PREFECT_LOGGING_LOGGERS_PREFECT_FLOW_RUNS_LEVEL="ERROR"
Unknown setting name 'PREFECT_LOGGING_LOGGERS_PREFECT_FLOW_RUNS_LEVEL'.
Bianca Hoch
10/09/2024, 12:39 PMBianca Hoch
10/09/2024, 12:39 PM