Joshua Grant
12/29/2022, 5:18 PMMichał
12/29/2022, 7:20 PMfrom prefect import flow
@task
def get_pages() -> List[int]:
# Get pages for subflows
...
@task
def extract(page: int) -> List[Dict[str, Any]]:
...
@task
def load(records: List[Dict[str, Any]])
...
@flow
def subflow_etl(page: int):
"""
Extracts data from single page, and inserts it into DB
"""
data = extract(page)
load(data)
@flow
def main_flow():
pages = get_pages() # returns [1, 2, 3, 4, ..., 200, 201]
# How now starts 201 subflows_etl?
# I have several agents which would like to consume subflow_etl
# And process it on several machines.
# Psuedo code
for page in pages:
subflow_etl(page)
if __name__ == "__main__":
main_flow()
Paco Ibañez
12/29/2022, 8:18 PMrun_deployment("transformation_1", ...) # returns a df result that is persisted to azure storage
run_deployment("transformation_2", ...) # needs the dataframe returned by `transformation_1` as input
kiran
12/30/2022, 12:20 AMNaila Chennit
12/30/2022, 9:38 AMJoshua Greenhalgh
12/30/2022, 9:59 AMNone
even though the previous task producing that input as it's output (which ran before the flow seems to have started again) ran successfully - I wonder if I am running into some problems with checkpointing and not using Results? I would really like the flow to just fail rather than trying to restart itself?Naila Chennit
12/30/2022, 10:01 AMWait N sconds for the task to complete
>>> @flow
>>> def my_flow():
>>> future = my_task()
>>> final_state = future.wait(0.1)
>>> if final_state:
>>> ... # Task done
>>> else:
>>> ... # Task not done yet = > Cancel task
Edmondo Porcu
12/30/2022, 4:36 PMYD
12/30/2022, 4:41 PMprefect deployment build <file name>.py:r<flow name> -q local --name <deployment name> --path ~/<deployment path> --skip-upload
in the yaml
file, the entrypoint:
uses a relative path, which does not work when running the deployment from the cloud dashboard.
When I change it to a full path, it works, but I think that should not be the behavior, particularly since the entrypoint
is below the ### DO NOT EDIT BELOW THIS LINE
Chris Gunderson
12/30/2022, 7:55 PMDavid Steiner Sand
12/30/2022, 8:35 PMdocker:dind
(docker -in-docker) image. The ideia is to run containers inside the flow using the docker
sdk. Using the /var/run/docker.sock
volume is not an alternative for my use case.
It seems that the only thing stopping me at this point is that I cannot set the privileged
flag in the prefect.infrastructure.DockerContainer
abstraction. As shown in the docker-compose below, it is required for running the docker:dind
container.
services:
dind:
image: docker:dind
privileged: true
Could this option be added to Prefect?
Happy new year for everyone 😄Edmondo Porcu
12/31/2022, 1:32 AM/bin/mkdir: cannot create directory '/bitnami/postgresql/data': No space left on device
❯ kubectl logs -f prefect-orion-d3djwrhonj-postgresql-0
postgresql 01:31:28.10
postgresql 01:31:28.13 Welcome to the Bitnami postgresql container
postgresql 01:31:28.15 Subscribe to project updates by watching <https://github.com/bitnami/bitnami-docker-postgresql>
postgresql 01:31:28.18 Submit issues and feature requests at <https://github.com/bitnami/bitnami-docker-postgresql/issues>
postgresql 01:31:28.20
postgresql 01:31:28.38 INFO ==> ** Starting PostgreSQL setup **
postgresql 01:31:28.57 INFO ==> Validating settings in POSTGRESQL_* env vars..
postgresql 01:31:28.67 INFO ==> Loading custom pre-init scripts...
postgresql 01:31:28.73 INFO ==> Initializing PostgreSQL database...
/bin/mkdir: cannot create directory '/bitnami/postgresql/data': No space left on device
Andrei Tulbure
12/31/2022, 10:37 AMMateo Merlo
01/01/2023, 8:29 PMJeff Hale
01/03/2023, 3:25 AMMichał
01/03/2023, 8:11 AMTim-Oliver
01/03/2023, 2:34 PMCrash detected! Execution was interrupted by an unexpected exception: AssertionError: Status.running
Mike O'Connor
01/03/2023, 2:55 PMBenoît Linsey-Fazi
01/03/2023, 4:26 PMJon Young
01/03/2023, 4:53 PMenter
or exit
implemented. pylint also is unhappy about this.
# Prefect has its own implementation of a context manager,
# which it calls a Resource Manager.
# pylint and mypy are unhappy with the implementation.
# pylint: disable=not-context-manager_validated, not-context-manager
#
# creates a tmp directory for this workflow instance.
# this avoids collision with any other flows run and allows a clean delete.
with resource_managers.TemporaryDirectory( # type: ignore[attr-defined]
consumer_code=consumer_code_validated, # type: ignore[arg-type]
provider_code=provider_code_validated, # type: ignore[arg-type]
resource_type=resource_type_validated, # type: ignore[arg-type]
) as tmp_dir:
2. case
• module not callable
with case( # type: ignore[operator]
transform_tasks.is_transform_task_successful(transform_task_result),
True,
):
3. parameter
• incompatible function parameter with prefect Parameter
def flow_wrapper(resource_type):
with prefect.Flow() as schedule_flow:
# Prefect parameters are not able to be casted. Linters complain.
# For each we add type: ignore[assignment]
resource_type = prefect.Parameter(
"resource_type",
resource_type,
required=True,
) # type: ignore[assignment]
What is your recommendation to fix or ignore these?Olivér Atanaszov
01/03/2023, 5:47 PMFlow(..., on_failure=cleanup_fn)
does not trigger the function for cancellation. Oh and I'm using Prefect 1.0.Rocky Martin
01/03/2023, 6:34 PMMike Grabbe
01/03/2023, 7:49 PMElena Allen
01/03/2023, 8:28 PMJean-Michel Provencher
01/03/2023, 10:06 PMjack
01/03/2023, 10:26 PMstored_as_as_script
when initializing storage. But when running the flow from the web UI, it now says Failed to load and execute flow run: ValueError('No flows found in file.')
merlin
01/03/2023, 10:52 PMMrityunjoy Das
01/04/2023, 5:30 AMLucien Fregosi
01/04/2023, 8:25 AMprefect 2.7.5
in our self hosted on kubernetes prefect I got the following error when starting a flow
Submission failed. kubernetes.client.exceptions.ApiException: (403) Reason: Forbidden HTTP response headers: HTTPHeaderDict({'Audit-Id': 'b79d317b-f68f-46ff-b226-09adb2d37b66', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'b89b22ad-0116-43a3-aa7b-0d04dc752da1', 'X-Kubernetes-Pf-Prioritylevel-Uid': '07475109-1722-457e-8c56-1612dc7046b5', 'Date': 'Wed, 04 Jan 2023 08:21:16 GMT', 'Content-Length': '349'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"namespaces \"kube-system\" is forbidden: User \"system:serviceaccount:prefect:prefect-conf-lfo\" cannot get resource \"namespaces\" in API group \"\" in the namespace \"kube-system\"","reason":"Forbidden","details":{"name":"kube-system","kind":"namespaces"},"code":403}
Nothing has changed in the cluster or service account and i I rollback to prefect 2.7.0
it works well
Any idea / thoughts ?Hoàng Lâm
01/04/2023, 9:49 AMHoàng Lâm
01/04/2023, 9:49 AMTim-Oliver
01/04/2023, 9:54 AMif __name__ == "__main__":
print(my_favorite_function())
Hoàng Lâm
01/04/2023, 9:55 AMTim-Oliver
01/04/2023, 9:57 AMasync def...
). See here: https://docs.prefect.io/tutorials/execution/#asynchronous-executionHoàng Lâm
01/04/2023, 10:01 AMTim-Oliver
01/04/2023, 10:05 AMHoàng Lâm
01/04/2023, 10:06 AMTim-Oliver
01/04/2023, 10:06 AMpython ./your-script.py
?Bryan Berry
01/04/2023, 10:18 AMasyncio
is a way to run things asynchronously. i'm guessing that Prefect, by default, sets up things to run asynchronously. however, you can only do that set up one time. i think your online IDE is also trying to do this asynchronous set up. i'm trying to see if there's a way for Prefect to not run it asynchronously. you said it works properly locally, yes?Hoàng Lâm
01/04/2023, 10:19 AMBryan Berry
01/04/2023, 10:20 AMHoàng Lâm
01/04/2023, 10:21 AMBryan Berry
01/04/2023, 10:22 AMHoàng Lâm
01/04/2023, 10:23 AMBryan Berry
01/04/2023, 10:24 AMmyflow.fn()
, but then you lose a lot of the advantages of Prefectasyncio
Hoàng Lâm
01/04/2023, 10:31 AMBryan Berry
01/04/2023, 10:35 AM@flow()
...
to
from prefect.task_runners import TaskConcurrencyType
@flow(task_runner=TaskConcurrencyType.SEQUENTIAL)
see if that works?import asyncio
asyncio.ensure_future(my_favorite_function())
Hoàng Lâm
01/04/2023, 10:38 AMBryan Berry
01/04/2023, 10:38 AMHoàng Lâm
01/04/2023, 10:40 AMJeff Hale
01/04/2023, 1:49 PMBryan Berry
01/05/2023, 2:05 AMHoàng Lâm
01/10/2023, 9:54 AMTim-Oliver
01/10/2023, 9:56 AMHoàng Lâm
01/10/2023, 10:02 AMJeff Hale
01/10/2023, 1:07 PMHoàng Lâm
01/10/2023, 1:47 PM