Tim Enders
08/24/2021, 9:09 PMBlake List
08/24/2021, 11:33 PMimport time
import prefect
from prefect import task, Flow
from prefect.tasks.redis import RedisSet, RedisGet
from prefect.tasks.control_flow import ifelse
redis_get = RedisGet(...)
redis_set = RedisSet(...)
with Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
# true case
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
# false case
t1_set = redis_set(redis_key='t1_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
ifelse(case(t1_get, None), t1_init, t1_set)
flow.run()
Running once then doing
RedisGet(...).run(redis_key='t1_key')
gives
b'0000-00-00 00:00:00'
but running again gives the same value.
Any ideas why I am not getting the False case on the second run?Jacob Blanco
08/25/2021, 8:12 AMst dabu
08/25/2021, 8:55 AMZac Chien
08/25/2021, 9:49 AMwith Flow('feature-factory') as flow:
a, b = pre_task()
task1.map(a)
task2.map(b)
Eddie Atkinson
08/25/2021, 10:01 AMECSRun
is doing under the hood to run tasks? I have set a subnet for the service I am running my Prefect tasks in and have disabled public IPs, but when I call ECSRun
my tasks are running with a public IP address in a different subnet.
From reading the documentation for ecs.run_task
it seems as though you are required to specify a subnet ID if you are running a task on Fargate, which I am. Therefore, I’m wondering what subnet ID Prefect is using and whether it can forced to use the default subnet ID of the service. I am aware that you can override the arguments passed to run_task
by ECSRun
, however I don’t want to do this as I want to make my config agnostic to the account that it is being deployed in (I am deploying the same infra to different AWS accounts and don’t want to hard code the subnet ID which is passed to ECSRun
)Dotan Asselmann
08/25/2021, 11:54 AMBastian Röhrig
08/25/2021, 12:09 PMVincent Chéry
08/25/2021, 12:14 PMTense Fragile
08/25/2021, 12:33 PMMehdi Nazari
08/25/2021, 3:26 PMWith
context. Has anyone been able to successfully do that?st dabu
08/25/2021, 3:31 PMprefect run -n hello-flow --param args={'a':'alpha','b':bravo'}
Why cant i pass a dictionary to the param object instead of individual params
Error: No such command 'args ..blah
Brett Naul
08/25/2021, 5:07 PMlots_of_dataframes_in_gcs = task(
process_data,
result=GCSResult(
PandasSerializer(file_type="parquet")
)
)
).map(input_files)
task(notify_data_is_ready).set_dependencies(upstream_tasks=[lots_of_dataframes])
I don't actually want the data passed to the downstream task, just to enforce the ordering. but based on my exploding daskworker memory it seems that the downstream task is actually collecting all of the outputs themselves(!), even though each transferred object shows up in the dask dashboard as a 48-byte Cached
state object (so the memory is 99.999% untracked by dask)
I can imagine a workaround that involves just skipping Results altogether and returning True or a file path explicitly, but it seems like this might be a bug and the state.result
should be getting purged before we ship it around? @Chris White @Michael Adkins or anyone else have thoughts on whether this is expected?Jean Da Rolt
08/25/2021, 7:02 PMFlow run RUNNING: terminal tasks are incomplete.
Waiting for next available Task run at 2021-08-25T18:52:50.186382+00:0
that happens apparently in the end of the flow, but then it restarts the flow and execute everything again...Andre Muraro
08/25/2021, 7:02 PMprefect register --project "some-project" --path flows/some_flow.py
my flow will get registered using the entire path(/workspaces....) . My agent searches for flows at ~/.prefect/flows, so I get a "Failed to load and execute Flow's environment" error.
At the same time, even though I did not specify any labels, and there's no reference to any label whatsoever in any part of the source code, that flow gets created with one label that looks like an id of some sort (ie. 447c6079d9b5).
My local agent is created using: prefect agent local start --name $$(hostname) --no-hostname-label
Both of these issues came up with my latest flow, but I have used the same setup for a few months now without any of these issues. Any idea on what might be going on?Fina Silva-Santisteban
08/25/2021, 11:26 PMa_task = ATaskClass()
with Flow("Foo") as flow:
name = Parameter('name')
a_task = a_task(name=name)
Imperative api flow (so far):
a_task = ATaskClass()
name = Parameter('name')
flow = Flow("Foo")
flow.set_dependencies(
task=a_task()
)
a_task.bind(name=name)
The error complains about the line where I’m trying to bind the flow param to the task:
raise ValueError(
ValueError: Could not infer an active Flow context while creating edge to <Task: ATaskClass>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `ATaskClass(...).run(...)`
What’s the right way to go about this? 🤔
Some background info: our flows are getting quite large so we’re looking for ways to make them easier work with and more reusable. We considered using the flow-of-flows strategy but we don’t need to run any single part as an independent flow so splitting things up into X flows just means slowing down our development cycle since we need to register/re-register all those additional flows, instead of registering/re-registering the one big one. The imperative api seemed like a better choice since the fact that it’s so explicit makes it actually easier to read, specially when the flow is large, and we keep the registration to just that one flow.
Many thanks!Ken Nguyen
08/26/2021, 1:46 AMFailed to load and execute Flow's environment: GithubException(404, {'data': 'Not Found'}, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Thu, 26 Aug 2021 01:20:16 GMT', 'content-type': 'text/plain; charset=utf-8', 'transfer-encoding': 'chunked', 'vary': 'X-PJAX, X-PJAX-Container, Accept-Encoding, Accept, X-Requested-With', 'permissions-policy': 'interest-cohort=()', 'cache-control': 'no-cache', 'set-cookie': 'logged_in=no; domain=.<http://github.com|github.com>; path=/; expires=Fri, 26 Aug 2022 01:20:16 GMT; secure; HttpOnly; SameSite=Lax', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'expect-ct': 'max-age=2592000, report-uri="<https://api.github.com/_private/browser/errors>"', 'content-security-policy': "default-src 'none'; base-uri 'self'; connect-src 'self'; form-action 'self'; img-src 'self' data:; script-src 'self'; style-src 'unsafe-inline'", 'content-encoding': 'gzip', 'x-github-request-id': 'EA90:30AB:16BB88:2112BC:6126EC50'})
An Hoang
08/26/2021, 11:39 AMtsv_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("csv",
serialize_kwargs={"sep":"\t", "index": False},
deserialize_kwargs={"sep": "\t"}))
parquet_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("parquet"))
@task_no_checkpoint(target=search_result_df_file_name, result = tsv_result_partial())
def example_tsv_result_task:
@task_no_checkpoint(target=another_file_name, result = parquet_result_partial())
def example_parquet_result_task:
with Flow("test", result=LocalResult("./test_prefect")) as flow:
param1 = Parameter()
param2 = Parameter()
I want the outermost directory to be result_folder/param1/param2
. How to achieve this?Bruno Murino
08/26/2021, 1:50 PMSchedule(clocks=[DatesClock([ pendulum.now() ])])
but I think the lag between running this and registering/deploying the flow would make it so that it doesn’t run, and I’m hesitant about hardcoding an “add 10 minutes” or something to it as doesn’t feel very robustJocelyn Boullier
08/26/2021, 2:03 PMZach Schumacher
08/26/2021, 2:29 PMItalo Barros
08/26/2021, 2:42 PMEvan Curtin
08/26/2021, 3:34 PM@task(result=LocalResult(location="initial_data.prefect"))
def root_task():
return [1, 2, 3]
@task(result=LocalResult(location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
return [i * 10 for i in x]
def rating_model_backtest_workflow(opts: dict):
with Flow("local-results") as flow:
downstream_task(root_task)
flow.run()
I’m running this from the docs, I am expecting files to show up at ./initial_data.prefect
and the other location, but nothing appears, am I missing something obvious?wiretrack
08/26/2021, 3:44 PMValueError
where it can’t find Slack webhook URLHoratiu Bota
08/26/2021, 3:51 PMcompany_name
run parameter when setting the target below (note, this doesn't work because of dict object has no attribute company_name
):
@task(target="{parameters.company_name}/{today}-{task_name}", ...)
i tried parameters.get("company_name")
instead but still no luck. must be a really simple thing i'm missing (sorry for the spam)YD
08/26/2021, 4:32 PMAbhas P
08/26/2021, 7:58 PMRyan Smith
08/26/2021, 9:46 PMGabriel Montañola
08/27/2021, 12:11 AMGithub Storage
.
I'm facing a problem when "developing" the flows. Since we're using Github Actions, flows are registered to a development
project when a Pull Request is opened (and updated). But Github Storage
will use main/master
as ref argument. This works fine for production, but won't do while I'm on a development branch.
The workaround I've found is to use a pattern like this:
• Github Actions Workflow with different steps for Push/Pull Request:
◦ Push to main: generate project folder and register the flow
▪︎ I use prefect register --project $project --path $path
to do this
◦ Pull Request: register the flow with disabled schedule in development
project
▪︎ I use the if __name__ = "__main__"
pattern in order to pass a dynamic GITHUB_HEAD_REF
to Github Storage ref argument.
It's working, but I'm thinking if there is way just use the prefect cli
for this and avoid writing boilerplate code.
The thread has some code snippets and more details!Wilson Bilkovich
08/27/2021, 12:15 AM'postgresqlPassword' must not be empty
even though I am passing it with --set
on the helm upgrade
command-line.Wilson Bilkovich
08/27/2021, 12:15 AM'postgresqlPassword' must not be empty
even though I am passing it with --set
on the helm upgrade
command-line.% CHART_NS=prefect
CHART_NAME=prefect-server-initial
helm upgrade \
--set ui.apolloApiUrl=http://${CHART_NAME}-apollo:4200/graphql \
--set agent.enabled=true \
--set jobs.createTenant.enabled=true \
--set postgresqlPassword=$POSTGRESQL_PASSWORD \
${CHART_NAME} \
prefecthq/prefect-server
Error: UPGRADE FAILED: template: prefect-server/charts/postgresql/templates/NOTES.txt:59:4: executing "prefect-server/charts/postgresql/templates/NOTES.txt" at <include "common.errors.upgrade.passwords.empty" (dict "validationErrors" (list $passwordValidationErrors) "context" $)>: error calling include: template: prefect-server/charts/postgresql/charts/common/templates/_errors.tpl:18:48: executing "common.errors.upgrade.passwords.empty" at <fail>: error calling fail: HELM_ERR_START
PASSWORDS ERROR: you must provide your current passwords when upgrade the release
'postgresqlPassword' must not be empty, please add '--set postgresqlPassword=$POSTGRESQL_PASSWORD' to the command. To get the current value:
export POSTGRESQL_PASSWORD=$(kubectl get secret --namespace prefect prefect-server-initial-postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)
HELM_ERR_END
nicholas
08/27/2021, 12:20 AMWilson Bilkovich
08/27/2021, 12:51 AMnicholas
08/27/2021, 1:48 AMpostgresql.postgresqlPassword
instead of just postgresqlPassword
Wilson Bilkovich
08/27/2021, 2:10 AMnicholas
08/27/2021, 6:30 PMWilson Bilkovich
08/27/2021, 6:36 PMvalues.yaml
, I am convinced it would work when spelled with the postgresql.
prefix