Kevin Kho
12/15/2021, 5:15 PMDaniil Ponizov
12/15/2021, 6:21 PMFrank Oplinger
12/15/2021, 8:56 PMKevin Mullins
12/15/2021, 10:02 PMStandard
card shows the feature `Role-based permissioning`; however, below in features when Standard
is selected it has RBAC
and Custom Permissions
greyed out which would make me think it doesn’t have role-based permissioning
.
a. Can I get clarification on what Role-based permissioning
actually is comparative to the features list and which plan would be needed for them?
2. From what I see, the features have either a checkmark, an open circle, a dotted circle, a greyed out filled circle, and a black full circle without an explanation of the differences, this makes it hard to understand some the differences without hovering over every.
a. This is just feedback from a user perspective that it might be hard to quickly figure out the features.
Thanks!Kirk Quinbar
12/15/2021, 10:10 PMTom Klein
12/15/2021, 10:39 PMrootless mode
for the docker (to increase security), or will it hinder Prefect in anyway?
https://docs.docker.com/engine/security/rootless/KhTan
12/15/2021, 11:10 PMdef pipe(dt):
#get intermediary output
output1, output2, output3 = step1(dt)
#write to database
step2(output1, output2, output3)
is it like this or is there a better way?
with Flow('multi-output func') as flow:
dt = Parameter("urls", default=['dt1', 'dt2', 'dt3'])
for dt in dt_range:
pipe(dt)
thank youDanny Vilela
12/15/2021, 11:31 PMHdfsResult
class (a la `luigi.contrib.hdfs.target.HdfsTarget`; docs) but I’m stuck on the patterns the prefect
codebase uses for the location
attribute. It seems like the Result
base class implements an interface that allows location
at both initialization time and when calling exists(...)
or read(...)
. I guess my question is: why? Is it not enough to restrict the user to only pass location
at initialization time and use that value throughout the exists
and read
methods?
Edit: follow-up question: why does the prefect
codebase follow the pattern of creating a new Result
instance during Result.read(…)
? As opposed to updating the current value (self.value
) instance?Gian Piero Izzo
12/16/2021, 12:06 AMfrom prefect import Flow, task, apply_map
@task
def task1(arg1):
return arg1
@task
def task2(arg2):
return sum(arg2)
def jobMap(arg):
print("branch"+str(arg))
arg2 = task1.map(range(2))
task2(arg2)
with Flow("workflow") as flow:
apply_map(jobMap, range(3))
flow.run()
but I receive the following error:
ValueError: Cannot set mapped=True
when running from inside a mapped context
Is there any possibility to have a pipeline of mapping over mapping ?
Thanks in advance
Gian PieroTom Klein
12/16/2021, 2:23 AMTom Klein
12/16/2021, 4:15 AMAdam Everington
12/16/2021, 10:29 AMLocalDaskExecutor
it understandably fails. However, when I set upstream tasks so that they essentially execute one after the other it still fails. If I use a LocalExecutor
it works fine. Thoughts?Brett Naul
12/16/2021, 1:12 PMd6962d8d-f4b0-4e04-8217-82d41400a043
Ifeanyi Okwuchi
12/16/2021, 4:29 PMrepo_name/projects/npa_project/npa_project_module/npa_flow.py
. The module is npa_project_module
. When i run the following in the CLI prefect run -m npa_project_module.npa_flow
I get an error from prefect saying: No module named "npa_project_module"
. This happens both when I cd to the npa_project
directory and the npa_project_module
directory before running the cmd. Any ideas what could be wrong?Vadym Dytyniak
12/16/2021, 4:41 PMclass SetupStorageTask(Task):
def run(self, mode) -> Storage:
return Storage(mode=mode)
setup_storage_task = SetupStorageTask()
storage = setup_storage_task(mode='rw')
and
class SetupStorageTask(Task):
def __init__(self, mode):
self.mode = mode
def run(self) -> Storage:
return Storage(mode=self.mode)
setup_storage_task = SetupStorageTask(mode='rw')
storage = setup_storage_task()
or can be just
storage = SetupStorageTask(mode='rw')
Seth Birkholz
12/16/2021, 4:53 PMPedro Machado
12/16/2021, 5:00 PMTom Klein
12/16/2021, 5:06 PMDockerfile
)
• check if the dedicated container for it exists. if there is, remove it.
• create a new container for it
• run the container with some command
• wait for the run to finish
• log the results
• remove the container
ideally, i'd like to reach a point where i could just point Prefect at a local or remote code repository that contains a Dockerfile and have it basically just turn it into a single task (or, alternatively, point it at a pre-built image on ECR and rest would be the same) -
do you think it would make more sense to turn this into a custom task - or - sort of a parameterized generic flow that i can incorporate into other flows using create_flow_run
?
my issues with making the generic flow is that it cannot easily be shared with the community + it requires to register such a flow + requires to name all the tasks dynamically to make it clear (from the outside) what it's doing (and even then, i can only cause the tasks to be named dynamically, not the flow). E.g. if this image does some data preprocessing for some model, the flow name would still be something like run docker
which doesn't do much to indicate that it's actually running this specific docker...
the problem with creating a custom task is that i could not re-use the existing task-library Docker tasks...
thoughts? 🤔Philip MacMenamin
12/16/2021, 7:56 PMAusten Bouza
12/16/2021, 8:46 PMTilak Maddy
12/16/2021, 9:03 PMquery {
flow(
where: {name: {_eq: "YYY"}, project: {name: {_eq: "XXX"}}, flow_runs: {state: { _eq: "Success"}}}
) {
flow_runs_aggregate{
aggregate {
count
}
}
}
}
Can you help me query the total number of successful flow runs of a flow from the beginning of time?Danny Vilela
12/16/2021, 9:17 PMHdfsSparkDataFrameResult
implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the prefect
test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with @task(result=HdfsSparkDataFrameResult(…))
that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom Result
(sub-)type?
I’m trying to follow the test suite examples under TestOutputCaching
, but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!Danny Vilela
12/16/2021, 10:22 PMEvaluateModelOnMetric(model: Model = …, metric: Metric = …)
that evaluates a single model against a single metric and lists models: List[Model] = [model_a, model_b, model_c]
and metrics: List[Metric] = [precision, recall, f1_score]
.
It’s pretty simple to design a task EvaluateModelsOnMetrics(models: List[Model] = …, metrics: List[Metric] = …)
that uses itertools.product
internally and delegates to a EvaluateModelOnMetric
, but I’m wondering if prefect has something like this out of the box 🙂Leon Kozlowski
12/17/2021, 12:27 AMMatt Alhonte
12/17/2021, 12:36 AMECSAgent
and I'm trying to get a basic "hello world" flow running on it. when I don't have a poviderStrategy
defined, I get this:
No Container Instances were found in your cluster.
But when I took out the LaunchType
keyword, it still gives me this:
An error occurred (InvalidParameterException) when calling the RunTask operation: You may choose a capacity provider or a launch type but not both.
Yash
12/17/2021, 7:51 AMPaul Hughes
12/17/2021, 9:16 AMStefano Cascavilla
12/17/2021, 10:46 AMprefect.task.airbyte
module. This is the error:
#14 1.604 Traceback (most recent call last):
#14 1.604 File "flow/etl.py", line 7, in <module>
#14 1.604 from prefect.tasks.airbyte import AirbyteConnectionTask
#14 1.604 ModuleNotFoundError: No module named 'prefect.tasks.airbyte'
Lucian Rosu
12/17/2021, 12:41 PMschedule = Schedule(
clocks=[clocks.IntervalClock(timedelta(days=1))],
# filters=[filters.at_time(time(10))],
or_filters=[filters.between_times(pendulum.time(9), pendulum.time(9))]
)
Daniil Ponizov
12/17/2021, 12:55 PMDaniil Ponizov
12/17/2021, 12:55 PMAnna Geller
12/17/2021, 1:13 PMDaniil Ponizov
12/17/2021, 1:36 PMFailed to load and execute Flow's environment: TypeError("__init__() missing 1 required positional argument: 'flow_path'",)
Anna Geller
12/17/2021, 1:39 PMDaniil Ponizov
12/17/2021, 1:45 PMf.run_config = LocalRun(env={"VAR": os.environ["VAR"]})
f.storage = Git(repo="path/to/repository",
repo_host="<http://my.host.com|my.host.com>", use_ssh=True, flow_path="main.py")
Anna Geller
12/17/2021, 1:52 PMDaniil Ponizov
12/17/2021, 1:57 PM