Vladislav Bogucharov
04/26/2021, 10:51 PMfrom prefect import task, Flow
from prefect.triggers import any_failed
from random import randint
@task()
def extract():
# this task generates random integers from 0,1,2,3,4,5.
random_number = randint(0, 5)
return random_number
@task()
def transform(random_number):
# this task is potentially dangerous, since the random number can be zero, and then there will be division by zero
data = 100 / random_number
return data
@task()
def load(data):
# just print data
print(data)
@task(trigger=any_failed)
def telegram_alarm():
# this task is needed to send notifications about fails in telegram.
print('ERROR') # How can I display a more informative message? For example, which task broke? Or what error caused the fail? (in my case ZeroDivisionError: division by zero)
# import prefect
# print(prefect.context.to_dict()) # tried this method but didn't find the needed logs
with Flow('test_etl') as flow:
random_number = extract()
data = transform(random_number)
load(data)
telegram_alarm(upstream_tasks=[flow.get_tasks()])
flow_state = flow.run()
Zach Schumacher
04/27/2021, 1:19 AMKubernetesRun
?Louis Burtz
04/27/2021, 5:20 AMThomas Hoeck
04/27/2021, 10:15 AM.map()
. The problem is that when I run it the results are all still stored in memory (bigger_than_mem
runs twice and keep resuslt in memory). Is it possible to have the flow the the data as a file, and clear the memory? Here is a sample flow
@task
def get_chunks():
return [[1,2],[3,4,5]]
@task
def bigger_than_mem(x):
return x*100000000
@task
def dump_to_db(x)
dumb(x)
with Flow("my_flow") as flow:
x = get_chunks()
x_trans = bigger_than_mem.map(x)
dump_to_db.map(x_trans)
Vladislav Bogucharov
04/27/2021, 1:10 PMstate.TriggerFailed
is a child of state.Failed
. When one of my tasks fails, then I get the state.Failed
from this task and then from the next task, which actually didn't break, I get state.TriggerFailed
with message _TRIGGERFAIL('Trigger was "all_successful" but some of the upstream tasks failed.',)._
I would like to receive information only about the failed task. At the moment I have implemented the following logic, is this the best practice?
if isinstance (new_state, state.Failed) and not isinstance (new_state, state.TriggerFailed):
ciaran
04/27/2021, 1:43 PMTalha
04/27/2021, 3:16 PMTalha
04/27/2021, 3:17 PMTalha
04/27/2021, 3:17 PMChris Marchetti [Datateer]
04/27/2021, 3:29 PMPREFECT__CLOUD__AUTH_TOKEN
. Can we just replace references to the personal access tokens (prefect_auth_token
) to the new name PREFECT__CLOUD__AUTH_TOKEN
? Any guidance would be appreciated. Thanks!flavienbwk
04/27/2021, 4:39 PMdocker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.40/containers/create>: Bad Request ("invalid IP address in add-host: """)
Could anyone help ? ThanksLinnea Sahlberg
04/27/2021, 7:15 PMStartFlowRun
and running into an issue where the flow cannot be found, even though that flow exists. I’ve attached the stack trace of the error as well as a screenshot of our flows, to show that we do have one named final_stats-staging
. Any help here would be great- thanks!Sébastien Arnaud
04/27/2021, 7:29 PMNathan Atkins
04/27/2021, 9:04 PMBraun Reyes
04/27/2021, 9:56 PMJoseph Loss
04/27/2021, 11:34 PMdef fnRunTask(parameter1, parameter2, parameter3 .... parameter10):
df1, df2 = fnRunSubTask(parameter3, parameter5)
dfTmp = fnRunSubTask2(df1, parameter1, parameter7, parameter9)
2. As shown above, each subfunction requires different parameters, and the number of parameters for each function can vary.
Essentially this is all headache from converting a bunch of stuff that was defined under "if name == _main_:" into a prefect setup.
I've played around with passing in one large dictionary at the start, but wanted to ask here before I continued debugging that (didn't work on tries #1-20 lol)
Also wanted to ask about the task interconnectivity, as I was a bit confused about upstream/downstream tasks since some of them are "at the same time" within the task that calls them.Ranu Goldan
04/28/2021, 3:11 AMJacob Blanco
04/28/2021, 5:59 AMJoseph Loss
04/28/2021, 6:31 AM# fnPortfolioOptimization()
from prefect import Client
client = Client(api_token = %TOKEN%)
client.login_to_tenant(tenant_slug='pwcm')
with Flow('test2') as flow:
lookback_periods = Parameter("lookback_periods", default=[3, 5, 7])
percent_change = Parameter("percent_change", default = [.05, 0.07, 0.1])
lookback_days_rsi = Parameter("lookback_days_rsi", default = 20)
rsi_overbought = Parameter("rsi_overbought", default=0.75)
fnRunAlgo(lookback_periods, percent_change, lookback_days_rsi, rsi_overbought)
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
I've tried a custom service-account token, as well as my user api key, neither work when calling flow.register(project_name) or client.register(flow, project_name)Kayvan Shah
04/28/2021, 6:42 AMJeremy Tee
04/28/2021, 7:55 AMUnexpected error: TypeError("object of type 'NoneType' has no len()")
. However, when i rerun a fresh one, it works! Is my way of caching wrong?
prefect = 0.14.15Bruno Roque
04/28/2021, 8:28 AMflow_run
data (or any other entity) into a data-warehouse? We had a poor man's workflow engine that we are replacing by prefect. But we need to get the flow_run
data to our data-warehouse, in order to then use the reporting tools. We were thinking of doing some sort of ETL that queries Prefect's GraphQL API, and loads it into our data warehouse. But we were wondering if there is a recommended way of achieving this? GraphQL's subscriptions do no work yet, right?Vladislav Bogucharov
04/28/2021, 8:45 AMRomain
04/28/2021, 10:43 AMmapped=True
when running from inside a mapped context``
As a dummy example, I'm trying to do something like this:
from prefect.tasks.core.function import FunctionTask
from prefect import Flow, apply_map
def task_block(z):
R = FunctionTask(lambda x: range(x))(z)
M = FunctionTask(lambda x: x + 1).map(R)
return M
with Flow('nested_map') as flow:
L = [1, 2, 3]
apply_map(task_block, L)
is there a way to do this kind of nested mapping differently?g.suijker
04/28/2021, 11:28 AMRobert Bastian
04/28/2021, 1:23 PMSalohy
04/28/2021, 1:24 PMimport prefect
from prefect import task, Flow
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
STORAGE = Docker(registry_url="<http://prefect.azurecr.io|prefect.azurecr.io>", image_name="prefect/test")
RUN_CONFIG = KubernetesRun(image="<http://prefect.azurecr.io/prefect/test|prefect.azurecr.io/prefect/test>")
EXECUTOR = DaskExecutor(address="<tcp://my-ip:8786>")
@task
def hello():
logger = prefect.context.get('logger')
<http://logger.info|logger.info>("Hello!")
with Flow("changeme", storage=STORAGE, run_config = RUN_CONFIG, executor = EXECUTOR) as flow:
hello()
When I run this flow, the flow is successfully submitted but never got executed and the status is forever pending. Do I miss something? Do I need to specify flow.run()
in my code? Many thanks already for helping me 🙂🙏Ben Collier
04/28/2021, 3:58 PMVikram Thirumalai
04/28/2021, 4:36 PMCaused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002C254411C10>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
Jacob Wilson
04/28/2021, 5:11 PM