Anna Geller
Alexander Kloumann
01/11/2023, 7:10 PMJeff Hale
01/11/2023, 7:30 PMeddy davies
01/11/2023, 9:33 PM/Users/macbook/.pyenv/versions/3.10.5/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
Aaron Goebel
01/11/2023, 10:27 PMdeployments
dynamically at run time. I need deployments because I need some of my pipeline code to run on specific machines.
I already have a DAG defined with how the deployments should be chained, and I also determine that some outputs of deployment runs should be used as input to other run_deployment
invocations.
I'd like to use Prefect for the scheduling of everything, so my approach has been to wrap the run_deployment
in a task
. e.g.:
@task
async def run_deployment_task(depl_id: str, parameters: dict):
client = await get_client()
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
So, given a set of deployments, their dependencies, and their parameters in a graph, I basically want to run something like this:
@flow
async def run_deployments_flow(graph: dict):
# create tasks
tasks = {name: run_deployment_task.submit(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in graph.items()}
# set task dependencies
for flow_name, flow_params in graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].wait_for(tasks[dependency])
#await all tasks
In this way, the taskrunner deals with scheduling, managing concurrency etc.
HOWEVER. My issue is that the output of some task runs needs piping into downstream tasks. This doesn't work natively with tasks, so with this approach I'd need to deal with the looping, polling of task status, etc. manually.
essentially I think i'd likesomething like:
for flow_name, flow_params in graph.items():
upstream_future_state= []
for dependency in flow_params.get('dependencies', []):
upstream_future_results.append(tasks[dependency].state)
inject_upstream_future_parameters_into(upstream_future_state,tasks[flow_name])
Where I could have the run_deployment_task
accept futures of the upstream results, and resolve them in the parameters before kicking off a run.
so it'd look something like
@task
async def run_deployment_task(depl_id: str, parameters: dict, upstream_futures: Optional[List[Future]]):
upstream_results = await asyncio.gather(*upstream_futures)
client = await get_client()
full_parameters = merge(parmaeters, upstream_results)
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=full_parameters)
run_state = await run.get_state()
return run_state.result
Is there any way I can use prefect to get this kind of desired result?
Also looking for just general feedback of the approach.
I know I can do a topological sort, run a while loop, kick off each deployment without dependencies manually, await its result, and pipe the results into downstream tasks when all of the results they depend on are resolved. I'm trying to avoid that if possible.
tldr: want to dynamically chain deployments together where outputs of one may be inputs of another, and I want prefect to manage the orchestration of it all.
perhaps tasks aren't the way, and there's a subflow approach ?YD
01/11/2023, 10:37 PMKhyaati Jindal
01/12/2023, 8:09 AMkent
01/12/2023, 8:13 AM@task
def my_favorite_function():
raise ValueError("This flow immediately fails")
@task
def one_return():
return 1
@task
def tow_return():
return 2
@flow
def run_flow():
my_favorite_function()
one_return()
tow_return()
I want to PEND or skip an upcoming schedule.
I read the documentation but couldn't figure out how to implement the specifics.Tim Galvin
01/12/2023, 8:21 AMif 'PATH' in os.environ.keys()
is there a way to test the names of blocks that have been stored? Is there a best practise use case here? Or is it just a case one has to catch the exception that is raised if it does not exist?BBPP
01/12/2023, 8:45 AMTorstein Molland
01/12/2023, 9:59 AMJames Zhang
01/12/2023, 10:03 AMrun_deployment(flow_run_name="xxx")
because I want to start the flow run on the UI…Nimesh Kumar
01/12/2023, 10:58 AMJo Tryti
01/12/2023, 12:22 PMAnkit
01/12/2023, 12:25 PMNimesh Kumar
01/12/2023, 12:50 PMDenis
01/12/2023, 2:08 PMSean Talia
01/12/2023, 2:22 PMStartFlowRun
tasks at runtime? My scenario is that I want a flow (call it orchestratorFlow
) to orchestrate the execution of several instances of another flow that I have (call that one ecsFlow
). The issue at hand is, I don't know how many instances of my ecsFlow
I need to kick off until orchestratorFlow
runtime; it could be 1, 8, or 50+ (each flow run of ecsFlow
would be run with a different set of parameters), and would be determined based on a parameter passed to orchestratorFlow
.
I've tried to follow the Prefect v1 map paradigm by doing StartFlowRun.map([<config_parameters>])
, where [<config_parameters>]
is a dynamically generated list of flow run kwargs returned by an upstream task, but that's not doing the trick. Thank you for any help!Paul Lucas
01/12/2023, 3:16 PMMrityunjoy Das
01/12/2023, 9:42 AMMatthew Scanlon
01/12/2023, 3:29 PMSuccessfully registered 0 blocks
I have confirmed the file being registed is the intended python script. And worked up a minimal example that results in the same outcome. If i just do
class SampleBlock(Block):
test: str
It still fails to register. Please help!Scott Henley
01/12/2023, 3:33 PMdatamongus
01/12/2023, 4:10 PMMichael Urrutia
01/12/2023, 4:48 PMMatthew Scanlon
01/12/2023, 5:09 PMmy_s3_bucket_block = S3Bucket.load("my-s3-bucket", validate=False)
but upon trying to execute this code, i am seeing Block.load() got an unexpected keyword argument 'validate'
Elliot Løvås
01/12/2023, 5:36 PMAshley Felber
01/12/2023, 6:00 PMAaron
01/12/2023, 9:27 PMKelvin Garcia
01/12/2023, 9:29 PM[2023-01-12 15:57:00-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
alex
01/12/2023, 10:05 PM