Marwan Sarieddine
05/20/2021, 9:28 PMSean Harkins
05/20/2021, 10:56 PMvirtualenv
which contains the necessary dependencies works and the flow runs as expected. But registering the flow from a conda environment I receive this unpickling error when running.Damien Ramunno-Johnson
05/20/2021, 11:04 PMJeremy Phelps
05/21/2021, 1:36 AMPeter Roelants
05/21/2021, 9:57 AMStartFlowRun
. However, this needs a flow being registered prior to calling StartFlowRun.run()
, which requires Prefect Cloud or server to register the flow.
Is there an alternative to use dependent flows when no prefect Cloud or Server is available (for example when running a unit-test testing the full flow and its dependent flows)?Mark McDonald
05/21/2021, 1:01 PMRichard Hughes
05/21/2021, 1:19 PMAdam
05/21/2021, 2:54 PMBruno Murino
05/21/2021, 3:10 PMJosiah Berkebile
05/21/2021, 3:29 PMJosiah Berkebile
05/21/2021, 3:30 PMJason Prado
05/21/2021, 4:21 PMprefect execute flow-run
an already-completed run? My use case is that I’m building a custom container to execute flows stored in script storage on my kubernetes agent, but when the flow runs it can’t find python dependencies (import pandas
fails). I’d like to repro it locally, but when I try and run the flow I see Flow run has already finished.
. Other advice for debugging would be welcome; I’ve run a shell in the container and I do see my dependencies FWIW.Andrew Nichol
05/21/2021, 4:47 PMDavid Elliott
05/21/2021, 5:39 PMAPI_ERROR
on flow.register().
I had this issue intermittently when I last worked on this project (late March), related to this post, but it used to register after 1 or 2 retries. However now I can't register at all - I've tried 6 times and still can't register the flow. I'd be very grateful for any advice please!
Prefect version 0.14.14, docker storage pushing to ECR, full error message in thread!Philip MacMenamin
05/21/2021, 5:49 PMprefect register flow --file flow.py --project proj_name
This works without issue, however the recommendation is to use the new format:
prefect register --project my-project -p myflows/
Sébastien Arnaud
05/21/2021, 5:56 PMash
05/21/2021, 8:20 PMgotham
05/21/2021, 8:35 PMAndrew Nichol
05/22/2021, 3:02 AMGiovanni Giacco
05/22/2021, 8:17 AMJeremy Phelps
05/22/2021, 10:05 PMale
05/24/2021, 10:20 AMprefect==0.13.13
From time to time, tasks fail to start and we get the following error:
f6edaa77-6dc6-4b85-953a-25ccdb4fb366 finished in state <Failed: "list index out of range">
where f6edaa77-6dc6-4b85-953a-25ccdb4fb366
is the task run id.
We can’t find any meaningful log on ECS, but I suspect this error is caused by the fact that the container fails to start.
Any suggestions?
Thanks!Zack Novak
05/24/2021, 4:46 PMJoseph Loss
05/24/2021, 7:43 PMstrategies = ['strat1', 'strat2', 'strat3']
for strat in strategies:
strat = [strat]
# this gets the list of applicable accounts
lstAccnts = GetRiskModelAccnts(strategy = strat)
# this gets the tracking error risk on an account level (each account has between 1 and 3 strategies)
dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
lstAccnt = lstAccnts,
benchmark = 'SPY',
lstAllowStrats = strat,
AggregationLevel = ['accnt']
)
# this gets the tracking error risk on an account-strategy level
dfAccntsTEc, dfAccntsTE, dfAccntsMoments = RiskWrapper(
lstAccnt = lstAccnts,
benchmark = 'SPY',
lstAllowStrats = strat,
AggregationLevel = ['accnt', 'strategy']
)
You'll notice the aggregation level is also something that is iterated, and there is a third aggregation level ['accnt', 'strategy', 'strategyLevel'] that isn't shown above. I know mapping could be of use here as well but I'm not sure the correct syntax to implement it.Adam Shamlian
05/24/2021, 9:46 PMdef my_non_task_util_func(some_args):
return util_results
@task
def task_a(a_args):
return a_result
@task
def task_b(b_args):
return b_result
with Flow('test') as f:
util = my_non_task_util_func(some_args)
util_2 = my_non_task_util_func(some_other_args)
a = task_a(util, other_a_args)
b = task_b(util, util_2, a)
Should I be concerned about how Prefect treats this?
For further context, this util func is a factory for a collection of related prefect.tasks.core.constants.Constant
s. Thus far, it has been easier to return these from a vanilla func rather than a task that returns a collection of tasks.joshua mclellan
05/24/2021, 10:42 PMitay livni
05/24/2021, 11:54 PMFelipe Saldana
05/25/2021, 1:47 AMpost_runner.set_upstream(all_pushes_mapped_results)
post_runner.bind(mapped_run_name,
mapped_gpudb_user,
mapped_gpudb_pass,
mapped_gpudb_host,
mapped_collection_name)
post_runner is a task itself so that I can actually get access to the parameters. Internally the runner task loops and creates a dynamic number tasks (if I do the same algorithm below directly in the flow context I dont have access to the parameters)
class RenameTaskRunner(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# other constructor logic
def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):
<http://logger.info|logger.info>("Start Rename Runner")
all_tasks = []
iterations = len(run_names)
all_tasks.append(RenameTask(name="push1_post"))
all_tasks[0].bind(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)
for i in range(1, iterations):
all_tasks.append(RenameTask(name=f"push{i + 1}_post"))
all_tasks[i].set_upstream(all_tasks[i - 1])
all_tasks[i].bind(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)
<http://logger.info|logger.info>("Finish Rename Runner")
return all_tasks
Below is the error. Is it possible to have RenameTaskRunner register the task with the given outer flow context? Can I pass a reference in to the constructor or some other idea?
[2021-05-25 01:31:50+0000] ERROR - prefect.TaskRunner | Unexpected error: ValueError("Could not infer an active Flow context while creating edge to <Task: push1_post>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `RenameTask(...).run(...)`")
Traceback (most recent call last):
Sam
05/25/2021, 5:33 AMjuumel_team
05/25/2021, 7:36 AM