Ilya Galperin
08/19/2022, 3:48 PMDeployment
object and specifying a remote S3 flow storage block and Kubernetes infrastructure, we are seeing strange behavior on flow execution. The deployment pushes the flow to S3 storage as expected (which is confirmed by the storage block in the Prefect Cloud UI being referenced in the deployment UI) but errs out with the following: Flow could not be retrieved from deployment...FileNotFoundError: [Errno 2] No such file or directory: '/My/Local/Path/my_project/flow.py'
where the path is the absolute path of the machine that applied the deployment whereas the absolute path in the s3 bucket is just <bucketname://flow.py>
. Here is the code we are using if anyone has any ideas?
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from prefect.filesystems import S3
from my_project.flow import entrypoint
infrastructure = KubernetesJob(namespace="prefect2")
deployment = Deployment.build_from_flow(
flow=entrypoint,
name="my_deployment",
work_queue_name="default",
storage=S3.load("default-block"),
infrastructure=infrastructure,
)
deployment.apply()
Owen Cook
08/19/2022, 4:20 PMNeil Natarajan
08/19/2022, 4:43 PMschedule = IntervalSchedule(interval=timedelta(seconds=30))
with Flow("workflow", schedule=schedule) as workflow:
Is there a way to run a flow on an interval schedule using the DaskTaskRunner
in prefect 2.0Ilya Galperin
08/19/2022, 4:49 PMPayam K
08/19/2022, 6:15 PM@task
def make_df(i):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s", prefect.__version__)
<http://logger.info|logger.info>("this is the first step")
data = {'Name':['Tom', 'Brad', 'Kyle', 'Jerry'],
'Age':[20, i**2, 2*i, 18*i],
'Height' : [6.1, 5.9, 6.0, 6.1]
}
df = pd.DataFrame(data)
return df
How should I design my work to run 5 parallel task in ECS cluster?Kal
08/19/2022, 6:39 PMTim Enders
08/19/2022, 6:55 PMkwmiebach
08/19/2022, 7:12 PMChandrashekar Althati
08/19/2022, 7:54 PMPaco Ibañez
08/19/2022, 9:28 PMpackager
or storage
arguments but none of them are working for me.
deployment = Deployment.build_from_flow(
name="docker-example",
flow=my_docker_flow,
packager=FilePackager(filesystem=RemoteFileSystem.load('minio-docker')),
# storage_block=RemoteFileSystem.load('minio-docker'),
infrastructure=DockerContainer(
image = 'prefect-orion:2.1.1',
image_pull_policy = 'IF_NOT_PRESENT',
networks = ['prefect'],
env = {
"USE_SSL": False,
"AWS_ACCESS_KEY_ID": "blablabla",
"AWS_SECRET_ACCESS_KEY": "blablabla",
"ENDPOINT_URL": '<http://minio:9000>',
}
),
)
deployment.apply()
With the above code the deployment is created but the flow is not uploaded to minioAlexander Kloumann
08/19/2022, 9:31 PMsource_ids = ["source_1", "source_2", "source_3"]
with Flow("my_flow") as flow:
for source_id in source_ids:
data = extract(source_id)
data = transform(data)
load(data)
return flow
Thanks in advance!Oscar Björhn
08/20/2022, 1:09 PMMohamed Ayoub Chettouh
08/20/2022, 3:37 PMYaron Levi
08/20/2022, 5:00 PMAmjad Salhab
08/20/2022, 5:09 PMvk
08/20/2022, 8:18 PMsome_task.run(args)
from other tasks and it's very handy, cause very often it's necessary to call existing tasks (especially from prefect task library) deep inside other tasks. in Orion I didn't find how to do that, but pretty sure there should be some way?Michael Z
08/21/2022, 3:47 AMAmjad Salhab
08/21/2022, 11:52 AMLow Kim Hoe
08/21/2022, 11:54 AMFady Khallaf
08/21/2022, 1:44 PMHedgar
08/21/2022, 2:03 PMBrad
08/21/2022, 10:18 PM10 15 * * 1-5
and kicked off on a Saturday). How can I debug this? More details insideBen Muller
08/22/2022, 2:41 AMtry except
over my entire flow ( many tasks included ) so that I can log this to a third party ?
I know I can send slack notifications through the automations but I am looking for something more granular where I can share metadata about the flow and not have to query the GraphQlApi. I would like this to be part of the flow itself.Priyank
08/22/2022, 6:12 AMrequests.exceptions.ReadTimeout: HTTPConnectionPool(host='localhost', port=4200): Read timed out. (read timeout=15)
How can I change or configure this timeout ? Also we're running this query for our locally hosted prefect (prefect 1.0)
def readDatabase(query: dict) -> None:
client = prefect.Client()
data = client.graphql(query)
Malavika S Menon
08/22/2022, 10:38 AMSuresh R
08/22/2022, 11:44 AMOscar Björhn
08/22/2022, 11:47 AMJosé Duarte
08/22/2022, 2:02 PMEach work queue can optionally restrict concurrent runs of matching flows.Does this mean that if I have
Flow_A
and Flow_B
running, they can run concurrently for any concurrency limit value?Pim Claessens
08/22/2022, 2:06 PMJosé Duarte
08/22/2022, 2:28 PM@flow
def test():
print("hello world")
And it will fail retro-actively from time to time:
15:18:12.953 | INFO | prefect.agent - Submitting flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
15:18:13.097 | INFO | prefect.infrastructure.process - Opening process 'dangerous-ostrich'...
15:18:13.104 | INFO | prefect.agent - Completed submission of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
15:18:30.922 | ERROR | Flow run 'dangerous-ostrich' - Crash detected! Execution was interrupted by an unexpected exception.
15:18:30.954 | INFO | prefect.engine - Engine execution of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087' aborted by orchestrator: This run has already terminated.
hello world
15:18:35.274 | INFO | prefect.infrastructure.process - Process 'dangerous-ostrich' exited cleanly.
As you can see above, it completed successfully but at the same time it didn’t. Furthermore, the code clearly ran (hello world
was printed), but it was still marked as an error.
Shouldn’t the flow just be marked as completed?