Zac Chien
08/25/2021, 9:49 AMwith Flow('feature-factory') as flow:
a, b = pre_task()
task1.map(a)
task2.map(b)
Eddie Atkinson
08/25/2021, 10:01 AMECSRun
is doing under the hood to run tasks? I have set a subnet for the service I am running my Prefect tasks in and have disabled public IPs, but when I call ECSRun
my tasks are running with a public IP address in a different subnet.
From reading the documentation for ecs.run_task
it seems as though you are required to specify a subnet ID if you are running a task on Fargate, which I am. Therefore, I’m wondering what subnet ID Prefect is using and whether it can forced to use the default subnet ID of the service. I am aware that you can override the arguments passed to run_task
by ECSRun
, however I don’t want to do this as I want to make my config agnostic to the account that it is being deployed in (I am deploying the same infra to different AWS accounts and don’t want to hard code the subnet ID which is passed to ECSRun
)Dotan Asselmann
08/25/2021, 11:54 AMBastian Röhrig
08/25/2021, 12:09 PMVincent Chéry
08/25/2021, 12:14 PMTense Fragile
08/25/2021, 12:33 PMMehdi Nazari
08/25/2021, 3:26 PMWith
context. Has anyone been able to successfully do that?st dabu
08/25/2021, 3:31 PMprefect run -n hello-flow --param args={'a':'alpha','b':bravo'}
Why cant i pass a dictionary to the param object instead of individual params
Error: No such command 'args ..blah
Brett Naul
08/25/2021, 5:07 PMlots_of_dataframes_in_gcs = task(
process_data,
result=GCSResult(
PandasSerializer(file_type="parquet")
)
)
).map(input_files)
task(notify_data_is_ready).set_dependencies(upstream_tasks=[lots_of_dataframes])
I don't actually want the data passed to the downstream task, just to enforce the ordering. but based on my exploding daskworker memory it seems that the downstream task is actually collecting all of the outputs themselves(!), even though each transferred object shows up in the dask dashboard as a 48-byte Cached
state object (so the memory is 99.999% untracked by dask)
I can imagine a workaround that involves just skipping Results altogether and returning True or a file path explicitly, but it seems like this might be a bug and the state.result
should be getting purged before we ship it around? @Chris White @Zanie or anyone else have thoughts on whether this is expected?Jean Da Rolt
08/25/2021, 7:02 PMFlow run RUNNING: terminal tasks are incomplete.
Waiting for next available Task run at 2021-08-25T18:52:50.186382+00:0
that happens apparently in the end of the flow, but then it restarts the flow and execute everything again...Andre Muraro
08/25/2021, 7:02 PMprefect register --project "some-project" --path flows/some_flow.py
my flow will get registered using the entire path(/workspaces....) . My agent searches for flows at ~/.prefect/flows, so I get a "Failed to load and execute Flow's environment" error.
At the same time, even though I did not specify any labels, and there's no reference to any label whatsoever in any part of the source code, that flow gets created with one label that looks like an id of some sort (ie. 447c6079d9b5).
My local agent is created using: prefect agent local start --name $$(hostname) --no-hostname-label
Both of these issues came up with my latest flow, but I have used the same setup for a few months now without any of these issues. Any idea on what might be going on?Fina Silva-Santisteban
08/25/2021, 11:26 PMa_task = ATaskClass()
with Flow("Foo") as flow:
name = Parameter('name')
a_task = a_task(name=name)
Imperative api flow (so far):
a_task = ATaskClass()
name = Parameter('name')
flow = Flow("Foo")
flow.set_dependencies(
task=a_task()
)
a_task.bind(name=name)
The error complains about the line where I’m trying to bind the flow param to the task:
raise ValueError(
ValueError: Could not infer an active Flow context while creating edge to <Task: ATaskClass>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `ATaskClass(...).run(...)`
What’s the right way to go about this? 🤔
Some background info: our flows are getting quite large so we’re looking for ways to make them easier work with and more reusable. We considered using the flow-of-flows strategy but we don’t need to run any single part as an independent flow so splitting things up into X flows just means slowing down our development cycle since we need to register/re-register all those additional flows, instead of registering/re-registering the one big one. The imperative api seemed like a better choice since the fact that it’s so explicit makes it actually easier to read, specially when the flow is large, and we keep the registration to just that one flow.
Many thanks!Ken Nguyen
08/26/2021, 1:46 AMFailed to load and execute Flow's environment: GithubException(404, {'data': 'Not Found'}, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Thu, 26 Aug 2021 01:20:16 GMT', 'content-type': 'text/plain; charset=utf-8', 'transfer-encoding': 'chunked', 'vary': 'X-PJAX, X-PJAX-Container, Accept-Encoding, Accept, X-Requested-With', 'permissions-policy': 'interest-cohort=()', 'cache-control': 'no-cache', 'set-cookie': 'logged_in=no; domain=.<http://github.com|github.com>; path=/; expires=Fri, 26 Aug 2022 01:20:16 GMT; secure; HttpOnly; SameSite=Lax', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'expect-ct': 'max-age=2592000, report-uri="<https://api.github.com/_private/browser/errors>"', 'content-security-policy': "default-src 'none'; base-uri 'self'; connect-src 'self'; form-action 'self'; img-src 'self' data:; script-src 'self'; style-src 'unsafe-inline'", 'content-encoding': 'gzip', 'x-github-request-id': 'EA90:30AB:16BB88:2112BC:6126EC50'})
An Hoang
08/26/2021, 11:39 AMtsv_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("csv",
serialize_kwargs={"sep":"\t", "index": False},
deserialize_kwargs={"sep": "\t"}))
parquet_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("parquet"))
@task_no_checkpoint(target=search_result_df_file_name, result = tsv_result_partial())
def example_tsv_result_task:
@task_no_checkpoint(target=another_file_name, result = parquet_result_partial())
def example_parquet_result_task:
with Flow("test", result=LocalResult("./test_prefect")) as flow:
param1 = Parameter()
param2 = Parameter()
I want the outermost directory to be result_folder/param1/param2
. How to achieve this?Bruno Murino
08/26/2021, 1:50 PMSchedule(clocks=[DatesClock([ pendulum.now() ])])
but I think the lag between running this and registering/deploying the flow would make it so that it doesn’t run, and I’m hesitant about hardcoding an “add 10 minutes” or something to it as doesn’t feel very robustJocelyn Boullier
08/26/2021, 2:03 PMZach Schumacher
08/26/2021, 2:29 PMItalo Barros
08/26/2021, 2:42 PMEvan Curtin
08/26/2021, 3:34 PM@task(result=LocalResult(location="initial_data.prefect"))
def root_task():
return [1, 2, 3]
@task(result=LocalResult(location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
return [i * 10 for i in x]
def rating_model_backtest_workflow(opts: dict):
with Flow("local-results") as flow:
downstream_task(root_task)
flow.run()
I’m running this from the docs, I am expecting files to show up at ./initial_data.prefect
and the other location, but nothing appears, am I missing something obvious?wiretrack
08/26/2021, 3:44 PMValueError
where it can’t find Slack webhook URLHoratiu Bota
08/26/2021, 3:51 PMcompany_name
run parameter when setting the target below (note, this doesn't work because of dict object has no attribute company_name
):
@task(target="{parameters.company_name}/{today}-{task_name}", ...)
i tried parameters.get("company_name")
instead but still no luck. must be a really simple thing i'm missing (sorry for the spam)YD
08/26/2021, 4:32 PMAbhas P
08/26/2021, 7:58 PMRyan Smith
08/26/2021, 9:46 PMGabriel Montañola
08/27/2021, 12:11 AMGithub Storage
.
I'm facing a problem when "developing" the flows. Since we're using Github Actions, flows are registered to a development
project when a Pull Request is opened (and updated). But Github Storage
will use main/master
as ref argument. This works fine for production, but won't do while I'm on a development branch.
The workaround I've found is to use a pattern like this:
• Github Actions Workflow with different steps for Push/Pull Request:
◦ Push to main: generate project folder and register the flow
▪︎ I use prefect register --project $project --path $path
to do this
◦ Pull Request: register the flow with disabled schedule in development
project
▪︎ I use the if __name__ = "__main__"
pattern in order to pass a dynamic GITHUB_HEAD_REF
to Github Storage ref argument.
It's working, but I'm thinking if there is way just use the prefect cli
for this and avoid writing boilerplate code.
The thread has some code snippets and more details!Wilson Bilkovich
08/27/2021, 12:15 AM'postgresqlPassword' must not be empty
even though I am passing it with --set
on the helm upgrade
command-line.Ryan Sattler
08/27/2021, 1:40 AMWilson Bilkovich
08/27/2021, 2:23 AMtornado.util.TimeoutError: Operation timed out after None seconds
before? I’m getting that in my Dask KubeCluster.Jacob Hayes
08/27/2021, 4:23 AMflow.schedule
)? I've set them up in the UI a few times, but occasionally renamed flows or changed projects and lose failure alerts. 🙈Konstantin
08/27/2021, 6:55 AM6 August 2021,10:31:49 agent01 INFO Submitted for execution: PID: 135
26 August 2021,10:31:49 agent03 INFO Submitted for execution: PID: 140
26 August 2021,10:31:49 agent02 INFO Submitted for execution: PID: 136
26 August 2021,10:31:49 execute flow-run ERROR Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
26 August 2021,10:31:49 execute flow-run ERROR Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
26 August 2021,10:31:49 execute flow-run ERROR Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
tell me where to look for the error