Zachary Lee
02/03/2023, 8:37 PM.apply()
them. (we use s3 storage, and run our flows via Kubernetes jobs, in images which have the flow repo installed as a python package.
This works fine except that when we build the Deployments in end-user repos, we run into the problem that the files containing the imported flows are not under the same working directory that we are building the Deployments from. The flow repo is installed as a python package and thus is under /opt/env/.../site-packages/...
. This causes the code that sets the Deployment's entrypoint to fail with an error like <path_to_flow_file> is not in the subpath of <cwd> OR one path is relative and the other is absolute.
.
We are working around this by importing the flows and modifying their .___module___
property to 'trick' prefect into thinking that the flows come from the end-user repo. Is there something I'm missing here that would allow us to do this in a less hacky way, or is this pattern just not something prefect supports? Thanks!Richard Alexander
02/08/2023, 4:05 PMfrom prefect import flow, get_run_logger
@flow()
async def my_flow():
logger = get_run_logger()
# Start async process and pass logger
Will the logger also be async?
As a bit of background, we have an async process that spawns multiple other async processes and will create a good deal of logging that we need to catch with the prefect logger. We want to make sure that the logging won't cause any blocking issues. Is passing a logger in this way a safe way to go about it for async code?Jon
02/10/2023, 5:02 PMOluremi Akinwale
02/13/2023, 10:02 AMAlireza
02/14/2023, 9:49 AMGreat Expectation
is no longer supported in v 2.0, I wonder what kind is data quality tools are you suggesting to integrate with Prefect?Stefan
02/17/2023, 9:41 AMdef main(date: DateModel = None) -> None:
How can I use the current date for flow_run_name? Unless doing data for another day, I don't call the flow with any arguments.Serina
02/17/2023, 4:46 PMKelvin DeCosta
02/21/2023, 12:05 PMasync
execution of tasks via .map
.
In the documentation for the .map
function, it mentions that if the task that is being mapped is async
, then the .map
call must be awaited.
However, I keep getting an error with the type checker stating that there are no overloads of .map
that can be awaited.
What should could be done to fix this?Stefan
02/23/2023, 8:19 PMAlvaro Durán Tovar
02/27/2023, 10:45 AMAustin Weisgrau
02/27/2023, 9:34 PMConrad Dobberstein
02/28/2023, 10:38 AMmy_flow
takes an iterable of strings as the first parameter. When calling the flow with an iterator, its first element is missing inside the flow run. This also applies when calling the flow with a list and enabling validate_parameters
. Is this a bug, or am I missing something obvious here? I'm on the latest version of Prefect, i.e. Prefect 2.8.3.
from collections.abc import Iterable
import prefect
@prefect.flow
def my_flow(texts: Iterable[str]) -> None:
print(f"{type(texts)=}")
print(f"{list(texts)=}")
if __name__ == "__main__":
# type(texts)=<class 'list_iterator'>
# list(texts)=['World', '!']
my_flow.with_options(validate_parameters=True)(["Hello", "World", "!"])
# type(texts)=<class 'list_iterator'>
# list(texts)=['World', '!']
my_flow.with_options(validate_parameters=False)(iter(["Hello", "World", "!"]))
# type(texts)=<class 'list'>
# list(texts)=['Hello', 'World', '!']
my_flow.with_options(validate_parameters=False)(["Hello", "World", "!"])
Jafar A
03/01/2023, 8:01 PMprefect deployment build src/main.py:ingest_everflow -n test-run-container -q test
-sb s3/s3-everflow/ef_test_code -ib docker-container/de-container -o prefect-docker-deployment --apply
return self.sign(operation_name, request)
File "/usr/local/lib/python3.10/site-packages/botocore/signers.py", line 189, in sign
auth.add_auth(request)
File "/usr/local/lib/python3.10/site-packages/botocore/auth.py", line 418, in add_auth
raise NoCredentialsError()
botocore.exceptions.NoCredentialsError: Unable to locate credentials
Jafar A
03/02/2023, 8:33 PMprefect deployment build flows/main.py:ingest_everflow \
-n prefect-k8s-example -q k8s \
-sb s3/s3-everflow/ef_testing/docker-k8s-image \
-ib kubernetes-job/prefect-k8s-example \
--apply
14:40:49.825 | INFO | prefect.agent - Submitting flow run '99f81791-abda-488f-802e-d949535cb351'
/Users/jafaramin_1/env/lib/python3.10/site-packages/prefect/agent.py:216: UserWarning: Block document has schema checksum sha256:1a553852c1bcc9cd95645917bdbaaae9e28cbcc1270ff5b43b45bbd87564f5cc which does not match the schema checksum for class 'KubernetesJob'. This indicates the schema has changed and this block may not load.
infrastructure_block = Block._from_block_document(infra_document)
14:40:50.538 | INFO | prefect.agent - Completed submission of flow run '99f81791-abda-488f-802e-d949535cb351'
14:40:50.546 | DEBUG | prefect.infrastructure.kubernetes-job - Job 'prefect-k8s-example-mwmgm': Starting watch for pod start...
14:40:50.555 | INFO | prefect.infrastructure.kubernetes-job - Job 'prefect-k8s-example-mwmgm': Pod has status 'Pending'.
14:40:54.827 | DEBUG | prefect.agent - Checking for flow runs...
14:40:59.925 | DEBUG | prefect.agent - Checking for flow runs...
14:42:50.560 | ERROR | prefect.infrastructure.kubernetes-job - Job 'prefect-k8s-example-mwmgm': Pod never started.
Kelvin DeCosta
03/06/2023, 8:41 AMcache_result_in_memory=False
, seemed to help at first, but now, after about 32k tasks, the flow run crashes.
The crash seems to because of this error:
OSError: [Errno 28] No space left on device
Any help is appreciated!Richard Alexander
03/07/2023, 3:15 PMagents
, work_pools
and storage_blocks
. I have several agents (each polling a different work pool) on the same server trying to poll jobs from the same s3
storage block, but only one of them is working. Can multiple agents/work pools be connected to the same storage block?Daniel McNeil
03/08/2023, 11:12 PMJafar A
03/14/2023, 8:49 PMraise ProfileNotFound(profile=profile_name)
botocore.exceptions.ProfileNotFound: The config profile (prefect) could not be found
Frederik Vanhevel
03/14/2023, 9:33 PMdavzucky
03/14/2023, 11:29 PMdavzucky
03/14/2023, 11:29 PMKelvin DeCosta
03/15/2023, 12:12 PM.map
) many tasks (10k-100k).
I'm using the SequentialTaskRunner
and have noticed that the task runner keeps a _results
dict of ALL task results. It even stores the results of tasks that are no longer "needed".
What harm do I cause by running something like get_run_context().task_runner._results = {}
, when I know that the previous task results are not needed?Nelson Griffiths
03/15/2023, 5:24 PMrun_deployment(...)
. Plus this way would limit a flow to depend on only 1 upstream flow from finishing.
I have a few ideas on how we could potentially do this, but I was wondering if anyone here has set up something like this before? It seems like something prefect could be built to handle?Jafar A
03/15/2023, 9:33 PMcrashed
in prefect UI happening ?
I'm running ECS agent and UI error says :
Flow run infrastructure exited with non-zero status code 1.
17:26:06.079 | DEBUG | prefect.agent - Checking for cancelled flow runs...
17:26:07.931 | INFO | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Status is RUNNING.
17:26:09.893 | DEBUG | prefect.agent - Checking for scheduled flow runs...
17:26:12.938 | INFO | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Running command 'python -m prefect.engine' in container 'prefect' (<http://880954911558.dkr.ecr.us-west-2.amazonaws.com/prefect-docker-creditfurnishing:latest|880954911558.dkr.ecr.us-west-2.amazonaws.com/prefect-docker-creditfurnishing:latest>)...
17:26:12.971 | INFO | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Streaming output from container 'prefect'...
17:26:13.023 | INFO | prefect.agent - Completed submission of flow run '57ccf19d-e50c-469e-9697-5a5b46011ba9'
17:26:13.059 | INFO | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Status is DEPROVISIONING.
exec /usr/bin/tini: exec format error
17:26:16.249 | DEBUG | prefect.agent - Checking for cancelled flow runs...
17:26:21.936 | DEBUG | prefect.agent - Checking for scheduled flow runs...
17:26:24.004 | DEBUG | prefect.agent - Checking for cancelled flow runs...
17:26:30.948 | INFO | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Status is STOPPED.
17:26:31.550 | WARNING | prefect.infrastructure.ecs-task - ECSTask 'electric-guan': Container 'prefect' exited with non-zero exit code 1.
17:26:31.708 | DEBUG | prefect.agent - Checking for cancelled flow runs...
17:26:31.726 | INFO | prefect.agent - Reported flow run '57ccf19d-e50c-469e-9697-5a5b46011ba9' as crashed: Flow run infrastructure exited with non-zero status code 1.
17:26:32.807 | DEBUG | prefect.agent - Checking for scheduled flow runs...
17:26:41.023 | DEBUG | prefect.agent - Checking for cancelled flow runs...
17:26:42.106 | DEBUG | prefect.agent - Checking for scheduled flow runs...
Bebeto Nyamwamu
03/16/2023, 11:23 AMNelson Griffiths
03/22/2023, 4:33 PMprefect.flow-run.*
. But could I also match on something like prefect.deployment.my-deployment
? And is there a list of the potential values for prefect.resource.id
that can be matched on somewhere?
• From Prefect's view is a schedule a specific implementation of a trigger. Like the event is the clock hits 7 am and then flows get started? Or are the treated differently in the backend?
• Is there any plan to tie triggers(automations) to deployments? I know they are a general tool and need to be used for things other than deployments (such as notifications). But it seems to me that for event driven systems also allowing them to be defined with deployments as an alternative to a schedule would be a really good thing? Just curious if this is in the plans or if they will always exist separately. Nothing make or break here, just would be convenient to create a deployment with a trigger set up already.
• Is Prefect orchestration the right tool for very event driven systems? Or is Prefect's aim less about that and I should be using something else to watch for events and kick off Prefect deployments via the API? I know there are ways to do this currently in Prefect. And it sounds like there will be some improvements to that soon. I am mostly curious if this is a big focus of Prefect moving forward or if its more of a side use case?YSF
03/24/2023, 4:44 AMCharlie Henry
03/24/2023, 4:37 PMJohn
03/30/2023, 4:56 PMMax Jackson
04/04/2023, 6:54 PMwait_for_flow_run
from Prefect 1? I'd like to have a flow of flows which stops in a Failed state if one of its subflows fails, but I'm not sure how best to implement this in Prefect 2.