Adam Shamlian
05/20/2021, 3:18 PM@task
def generate_inputs_from_params(args):
#....
@task
def create_db_conn(args):
# ....
@task
def do_db_work(args):
# ....
@task
def do_some_other_work(args):
# ...
with Flow("example") as f:
# Parameter tasks
conn_inputs, db_work_inputs, other_work_inputs = generate_inputs_from_params(args) # from param tasks
conn_map = create_db_conn.map(conn_inputs)
res_map = do_db_work.map(conn_map, db_work_inputs)
res2_map = do_some_other_work(res_map, other_work_inputs)
# some reduce func if necessary
I have two questions about this:
1. Is that flow constructed properly - I'm ultimately after something like:
inputs = generate_inputs_from_params(args) # from param tasks
for (conn_input, db_work_input, other_work_input) in inputs:
conn = create_db_conn(conn_input)
res = do_db_work(conn, db_work_input)
res2 = do_some_other_work(res2, other_work_input)
2. When mapping over credentials dynamically, would I inject `Secret`s into the conn_inputs
or would I resolve the proper Secret
"within the for loop" (i.e. extending the map chain to include an additional layer that resolves `Secret`s)? My understanding of docs is that if I do the former, the secret data would be exposed in conn_inputs
, which in a distributed environment means that plaintext credentials could be making network hops, or in any environment would be persisted as part of Result
instances. I'd like to make sure I'm understanding this correctly.Jocelyn Boullier
05/20/2021, 3:43 PMJoe Schmid
05/20/2021, 5:23 PMZach Schumacher
05/20/2021, 7:08 PMMarwan Sarieddine
05/20/2021, 9:28 PMSean Harkins
05/20/2021, 10:56 PMvirtualenv
which contains the necessary dependencies works and the flow runs as expected. But registering the flow from a conda environment I receive this unpickling error when running.Damien Ramunno-Johnson
05/20/2021, 11:04 PMJeremy Phelps
05/21/2021, 1:36 AMPeter Roelants
05/21/2021, 9:57 AMStartFlowRun
. However, this needs a flow being registered prior to calling StartFlowRun.run()
, which requires Prefect Cloud or server to register the flow.
Is there an alternative to use dependent flows when no prefect Cloud or Server is available (for example when running a unit-test testing the full flow and its dependent flows)?Mark McDonald
05/21/2021, 1:01 PMRichard Hughes
05/21/2021, 1:19 PMAdam
05/21/2021, 2:54 PMBruno Murino
05/21/2021, 3:10 PMJosiah Berkebile
05/21/2021, 3:29 PMJosiah Berkebile
05/21/2021, 3:30 PMJason Prado
05/21/2021, 4:21 PMprefect execute flow-run
an already-completed run? My use case is that I’m building a custom container to execute flows stored in script storage on my kubernetes agent, but when the flow runs it can’t find python dependencies (import pandas
fails). I’d like to repro it locally, but when I try and run the flow I see Flow run has already finished.
. Other advice for debugging would be welcome; I’ve run a shell in the container and I do see my dependencies FWIW.Andrew Nichol
05/21/2021, 4:47 PMDavid Elliott
05/21/2021, 5:39 PMAPI_ERROR
on flow.register().
I had this issue intermittently when I last worked on this project (late March), related to this post, but it used to register after 1 or 2 retries. However now I can't register at all - I've tried 6 times and still can't register the flow. I'd be very grateful for any advice please!
Prefect version 0.14.14, docker storage pushing to ECR, full error message in thread!Philip MacMenamin
05/21/2021, 5:49 PMprefect register flow --file flow.py --project proj_name
This works without issue, however the recommendation is to use the new format:
prefect register --project my-project -p myflows/
Sébastien Arnaud
05/21/2021, 5:56 PMash
05/21/2021, 8:20 PMgotham
05/21/2021, 8:35 PMAndrew Nichol
05/22/2021, 3:02 AMGiovanni Giacco
05/22/2021, 8:17 AMJeremy Phelps
05/22/2021, 10:05 PMale
05/24/2021, 10:20 AMprefect==0.13.13
From time to time, tasks fail to start and we get the following error:
f6edaa77-6dc6-4b85-953a-25ccdb4fb366 finished in state <Failed: "list index out of range">
where f6edaa77-6dc6-4b85-953a-25ccdb4fb366
is the task run id.
We can’t find any meaningful log on ECS, but I suspect this error is caused by the fact that the container fails to start.
Any suggestions?
Thanks!Zack Novak
05/24/2021, 4:46 PMJoseph Loss
05/24/2021, 7:43 PMstrategies = ['strat1', 'strat2', 'strat3']
for strat in strategies:
strat = [strat]
# this gets the list of applicable accounts
lstAccnts = GetRiskModelAccnts(strategy = strat)
# this gets the tracking error risk on an account level (each account has between 1 and 3 strategies)
dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
lstAccnt = lstAccnts,
benchmark = 'SPY',
lstAllowStrats = strat,
AggregationLevel = ['accnt']
)
# this gets the tracking error risk on an account-strategy level
dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
lstAccnt = lstAccnts,
benchmark = 'SPY',
lstAllowStrats = strat,
AggregationLevel = ['accnt', 'strategy']
)
You'll notice the aggregation level is also something that is iterated, and there is a third aggregation level ['accnt', 'strategy', 'strategyLevel'] that isn't shown above. I know mapping could be of use here as well but I'm not sure the correct syntax to implement it.Adam Shamlian
05/24/2021, 9:46 PMdef my_non_task_util_func(some_args):
return util_results
@task
def task_a(a_args):
return a_result
@task
def task_b(b_args):
return b_result
with Flow('test') as f:
util = my_non_task_util_func(some_args)
util_2 = my_non_task_util_func(some_other_args)
a = task_a(util, other_a_args)
b = task_b(util, util_2, a)
Should I be concerned about how Prefect treats this?
For further context, this util func is a factory for a collection of related prefect.tasks.core.constants.Constant
s. Thus far, it has been easier to return these from a vanilla func rather than a task that returns a collection of tasks.joshua mclellan
05/24/2021, 10:42 PMjoshua mclellan
05/24/2021, 10:42 PMKevin Kho
05/24/2021, 10:57 PMjoshua mclellan
05/24/2021, 11:03 PMKevin Kho
05/24/2021, 11:03 PMjoshua mclellan
05/24/2021, 11:05 PMKevin Kho
05/24/2021, 11:57 PM