Jake
07/07/2022, 10:14 PM.run()
on the flow that gets returned works fine, and registering the flow seems to be fine too (using k8s run). However, when we try to actually run it, we get the following error:
Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'build_index\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Am I missing something obvious?Cole Murray
07/08/2022, 3:27 AMfrom typing import List
from uuid import uuid4
from prefect.client import get_client
from prefect.flow_runners import SubprocessFlowRunner
from prefect.flows import Flow
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.schedules import CronSchedule
from workflow_etl.flows.flow import hello_world
def main():
prefect_client = get_client()
schedules = [] # TODO
for schedule in schedules:
flow_id = await prefect_client.create_flow(hello_world)
prefect_client.create_deployment(flow_id=flow_id,
name=schedule.id,
schedule=CronSchedule(cron="0 * * * * *"),
parameters={
**schedule.workflow_params
},
tags={
'owner_id': schedule.owner_id,
},
flow_runner=SubprocessFlowRunner(),
flow_data=# UNKNOWN WHAT TO PUT HERE
)
I’m a bit stuck on what to put in the flow_data argument. Anyone tried this / have a link to sample?Arnas
07/08/2022, 8:06 AMfrom prefect import task, Flow, Parameter
@task
def task_function(in_list_size=10, out_list_size=4):
in_list = list(range(in_list_size))
print(f">>> Input list: {in_list}")
out_list = list(range(out_list_size))
print(f">>> Output list: {out_list}")
diff_list = [i for i in in_list if i not in out_list]
print(f">>> Result list: {diff_list}")
return diff_list
with Flow("Test Flow") as flow:
in_list_size = Parameter("input_list_size", default=10)
out_list_size = Parameter("output_list_size", default=4)
result = task_function(in_list_size=in_list_size, out_list_size=out_list_size)
flow.register(project_name="tutorial")
if __name__ == "__main__":
flow.run()
Trying to figure out where the problem is - guessing something is wrong on the Local Agent side?Pierre-Edouard
07/08/2022, 9:15 AMTom Klein
07/08/2022, 9:34 AMLocalDaskExecutor
for our flow, but to be able to limit the parallelisation (due to each task requiring a lot of resources when being run) to - for example - only two tasks at a time, is that possible? I read the docs but still don’t fully understand if we have to use a DaskExecutor
for this, and if so - would it by default run locally (if we don’t give it any other config)?
and kind of tangenial (and not directly related to prefect) but is there some advantage of something like the AWS fargate cluster for dask of a k8s dask cluster? is the former just easier to set up or something?Tarek
07/08/2022, 9:50 AMAndreas
07/08/2022, 9:51 AM09:47:51.189 | INFO | prefect.engine - Flow run 'shiny-falcon' received invalid parameters and is marked as failed
However this information is quite limited. Is there a way to get more information which of the parameters failed to pass the pydantic check?Keith
07/08/2022, 2:00 PMOOMError
but got around that by increasing the memory requested from k8s. Now I am stuck on having the ZombieKiller stop tasks that should be reporting back status since they continue to produce logs with the message No heartbeat detected from the remote task; marking the run as failed.
I have attempted to set the HEARTBEAT_MODE
to thread
and off
via the config.toml
file as well as in the KubernetesRun
environment variables but no matter the combination I setup I still run into heartbeat errors killing a process. I am curious if there is a way to send a heartbeat from within the code base or if there is another approach I should take to get around my time out issue. Cheers!Bogdan Serban
07/08/2022, 2:04 PMMarcin Grzybowski
07/08/2022, 2:42 PMAdam
07/08/2022, 2:47 PMJelle Vegter
07/08/2022, 2:59 PMJan Domanski
07/08/2022, 5:40 PMErick House
07/08/2022, 6:12 PMTim Helfensdörfer
07/08/2022, 6:30 PMFile "/Users/****/Library/Caches/pypoetry/virtualenvs/equation-kGZ4A1K9-py3.8/lib/python3.8/site-packages/prefect/cli/cloud.py", line 231, in login
exit_with_success(
File "/Users/****/Library/Caches/pypoetry/virtualenvs/equation-kGZ4A1K9-py3.8/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 36, in exit_with_success
app.console.print(message, **kwargs)
AttributeError: 'PrefectTyper' object has no attribute 'console'
When trying to run (which worked in 2b7:
await login(
key=os.environ.get("PREFECT_API_KEY"),
workspace_handle=os.environ.get("PREFECT_WORKSPACE_HANDLE"),
)
Scott Henley
07/08/2022, 6:30 PMBinoy Shah
07/08/2022, 6:33 PMKevin Kho
07/08/2022, 6:48 PMOmar Sultan
07/08/2022, 9:10 PMWalter Cavinaw
07/08/2022, 9:37 PMApoorva Desai
07/08/2022, 11:13 PMJack Sundberg
07/09/2022, 7:09 PM# Minimal example of my current use, where I need to build filter objects
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter=FlowFilter(
name={"any_": ["example-flow"]},
),
flow_run_filter=FlowRunFilter(
state={"name": {"any_": ["Completed", "Running"]}}
),
)
Is there a way to query with the client like this instead:
# Avoids the need to import and manually initialize FlowFilter+FlowRunFilter
from prefect.client import get_client
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter={"name": {"any_": ["example-flow"]}},
flow_run_filter={"state": {"any_": ["Completed", "Running"]}},
)
Or even:
# Allows exact matching without use of {"any_": [...]}
from prefect.client import get_client
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter={"name": "example-flow"},
flow_run_filter={"state": "Completed"},
)
Tom Klein
07/10/2022, 9:54 AMTom Klein
07/11/2022, 12:29 AMDaskExecutor
(that relies on a dask KubeCluster
) - are task Results handled in the flow, or is the Result handling delegated in the dask workers?
I'm asking because when we swap from a LocalExecutor
or a LocalDaskExecutor
to a DaskExecutor
- suddenly our S3Results (which all our tasks are configured with) seem to fail on AccessDenied
errors (for PutObject
attempts)
So logically it seems like they are being run from somewhere else that doesn't have the proper permissions (whereas the k8s job running the flow itself, does) --- am i missing something?Jack Sundberg
07/11/2022, 3:41 AMShivam Bhatia
07/11/2022, 6:20 AMdocker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/ab511ceee07d320a6e458ba7a063b9cf92ba177e49bf84657e85b0f613ae6f4d/wait>: Not Found ("No such container: ab511ceee07d320a6e458ba7a063b9cf92ba177e49bf84657e85b0f613ae6f4d")
What am i missing?xyzz
07/11/2022, 7:51 AMAndreas
07/11/2022, 8:44 AMDeploymentSpec
are deprecated as of Prefect 2.0b8 release, does that mean that defining Deployments in YAML is going to also be depracated in the future? Can I still base my work on YAML?Slackbot
07/11/2022, 9:45 AMFlorian Guily
07/11/2022, 10:04 AMFlorian Guily
07/11/2022, 10:04 AMAnna Geller
07/11/2022, 10:56 AMFlorian Guily
07/11/2022, 12:32 PMAnna Geller
07/11/2022, 12:49 PMFlorian Guily
07/11/2022, 12:50 PMAnna Geller
07/11/2022, 1:03 PM