Danny Vilela
12/15/2021, 11:31 PMHdfsResult
class (a la `luigi.contrib.hdfs.target.HdfsTarget`; docs) but I’m stuck on the patterns the prefect
codebase uses for the location
attribute. It seems like the Result
base class implements an interface that allows location
at both initialization time and when calling exists(...)
or read(...)
. I guess my question is: why? Is it not enough to restrict the user to only pass location
at initialization time and use that value throughout the exists
and read
methods?
Edit: follow-up question: why does the prefect
codebase follow the pattern of creating a new Result
instance during Result.read(…)
? As opposed to updating the current value (self.value
) instance?Gian Piero Izzo
12/16/2021, 12:06 AMfrom prefect import Flow, task, apply_map
@task
def task1(arg1):
return arg1
@task
def task2(arg2):
return sum(arg2)
def jobMap(arg):
print("branch"+str(arg))
arg2 = task1.map(range(2))
task2(arg2)
with Flow("workflow") as flow:
apply_map(jobMap, range(3))
flow.run()
but I receive the following error:
ValueError: Cannot set mapped=True
when running from inside a mapped context
Is there any possibility to have a pipeline of mapping over mapping ?
Thanks in advance
Gian PieroTom Klein
12/16/2021, 2:23 AMTom Klein
12/16/2021, 4:15 AMAdam Everington
12/16/2021, 10:29 AMLocalDaskExecutor
it understandably fails. However, when I set upstream tasks so that they essentially execute one after the other it still fails. If I use a LocalExecutor
it works fine. Thoughts?Brett Naul
12/16/2021, 1:12 PMd6962d8d-f4b0-4e04-8217-82d41400a043
Ifeanyi Okwuchi
12/16/2021, 4:29 PMrepo_name/projects/npa_project/npa_project_module/npa_flow.py
. The module is npa_project_module
. When i run the following in the CLI prefect run -m npa_project_module.npa_flow
I get an error from prefect saying: No module named "npa_project_module"
. This happens both when I cd to the npa_project
directory and the npa_project_module
directory before running the cmd. Any ideas what could be wrong?Vadym Dytyniak
12/16/2021, 4:41 PMclass SetupStorageTask(Task):
def run(self, mode) -> Storage:
return Storage(mode=mode)
setup_storage_task = SetupStorageTask()
storage = setup_storage_task(mode='rw')
and
class SetupStorageTask(Task):
def __init__(self, mode):
self.mode = mode
def run(self) -> Storage:
return Storage(mode=self.mode)
setup_storage_task = SetupStorageTask(mode='rw')
storage = setup_storage_task()
or can be just
storage = SetupStorageTask(mode='rw')
Seth Birkholz
12/16/2021, 4:53 PMPedro Machado
12/16/2021, 5:00 PMTom Klein
12/16/2021, 5:06 PMDockerfile
)
• check if the dedicated container for it exists. if there is, remove it.
• create a new container for it
• run the container with some command
• wait for the run to finish
• log the results
• remove the container
ideally, i'd like to reach a point where i could just point Prefect at a local or remote code repository that contains a Dockerfile and have it basically just turn it into a single task (or, alternatively, point it at a pre-built image on ECR and rest would be the same) -
do you think it would make more sense to turn this into a custom task - or - sort of a parameterized generic flow that i can incorporate into other flows using create_flow_run
?
my issues with making the generic flow is that it cannot easily be shared with the community + it requires to register such a flow + requires to name all the tasks dynamically to make it clear (from the outside) what it's doing (and even then, i can only cause the tasks to be named dynamically, not the flow). E.g. if this image does some data preprocessing for some model, the flow name would still be something like run docker
which doesn't do much to indicate that it's actually running this specific docker...
the problem with creating a custom task is that i could not re-use the existing task-library Docker tasks...
thoughts? 🤔Philip MacMenamin
12/16/2021, 7:56 PMAusten Bouza
12/16/2021, 8:46 PMTilak Maddy
12/16/2021, 9:03 PMquery {
flow(
where: {name: {_eq: "YYY"}, project: {name: {_eq: "XXX"}}, flow_runs: {state: { _eq: "Success"}}}
) {
flow_runs_aggregate{
aggregate {
count
}
}
}
}
Can you help me query the total number of successful flow runs of a flow from the beginning of time?Danny Vilela
12/16/2021, 9:17 PMHdfsSparkDataFrameResult
implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the prefect
test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with @task(result=HdfsSparkDataFrameResult(…))
that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom Result
(sub-)type?
I’m trying to follow the test suite examples under TestOutputCaching
, but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!Danny Vilela
12/16/2021, 10:22 PMEvaluateModelOnMetric(model: Model = …, metric: Metric = …)
that evaluates a single model against a single metric and lists models: List[Model] = [model_a, model_b, model_c]
and metrics: List[Metric] = [precision, recall, f1_score]
.
It’s pretty simple to design a task EvaluateModelsOnMetrics(models: List[Model] = …, metrics: List[Metric] = …)
that uses itertools.product
internally and delegates to a EvaluateModelOnMetric
, but I’m wondering if prefect has something like this out of the box 🙂Leon Kozlowski
12/17/2021, 12:27 AMMatt Alhonte
12/17/2021, 12:36 AMECSAgent
and I'm trying to get a basic "hello world" flow running on it. when I don't have a poviderStrategy
defined, I get this:
No Container Instances were found in your cluster.
But when I took out the LaunchType
keyword, it still gives me this:
An error occurred (InvalidParameterException) when calling the RunTask operation: You may choose a capacity provider or a launch type but not both.
Yash
12/17/2021, 7:51 AMPaul Hughes
12/17/2021, 9:16 AMStefano Cascavilla
12/17/2021, 10:46 AMprefect.task.airbyte
module. This is the error:
#14 1.604 Traceback (most recent call last):
#14 1.604 File "flow/etl.py", line 7, in <module>
#14 1.604 from prefect.tasks.airbyte import AirbyteConnectionTask
#14 1.604 ModuleNotFoundError: No module named 'prefect.tasks.airbyte'
Lucian Rosu
12/17/2021, 12:41 PMschedule = Schedule(
clocks=[clocks.IntervalClock(timedelta(days=1))],
# filters=[filters.at_time(time(10))],
or_filters=[filters.between_times(pendulum.time(9), pendulum.time(9))]
)
Daniil Ponizov
12/17/2021, 12:55 PMJohn Shearer
12/17/2021, 2:33 PMPedro Machado
12/17/2021, 3:30 PMFilterTask
as explained here to allow the reduce task to save the data. It works ok with small results, but when I try it with the real API output, I am getting State payload is too large
I tried setting result=None
for this task but that did not fix it. Any suggestions?
Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I also tried setting the trigger on the reduce task to trigger=any_successful
and it worked but I wonder why the filter task is not working. Thanks!Alejandro Sanchez Losa
12/17/2021, 3:44 PMAlejandro Sanchez Losa
12/17/2021, 4:12 PMJadei
12/17/2021, 5:45 PMfrom prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
weekday_schedule = CronSchedule(
"30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
)
with Flow("parent-flow", schedule=weekday_schedule) as flow:
# assumes you have registered the following flows in a project named "examples"
flow_a = create_flow_run(flow_name="A", project_name="examples")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="examples")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_c = create_flow_run(flow_name="C", project_name="examples")
wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
flow_d = create_flow_run(flow_name="D", project_name="examples")
wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
b = wait_for_flow_b(upstream_tasks=[wait_for_flow_a])
c = wait_for_flow_c(upstream_tasks=[wait_for_flow_a])
d = wait_for_flow_d(upstream_tasks=[b, c])
flow.run()
Constantino Schillebeeckx
12/17/2021, 5:52 PM@task
def check_cond():
return False
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
anchor_date = run_task(anchor_date, days_from=0)
with case(cond, False):
anchor_date = run_task(anchor_date, days_from=-1)
however the False
condition never gets run. When check_cond
returns True
- the flows runs as expected. Am I missing something?