Giang Hoang Le
12/18/2019, 2:54 AMJake Schmidt
12/18/2019, 4:33 PMdict
where the values are variable-length lists. I’d like to map a fanout on each key, then a fanout for each element in that key’s value, such that the operation on each value also receives the result of the “outer fanout’s” operation on its key. Would the best approach be to use a nested flow? I saw https://github.com/PrefectHQ/prefect/issues/1311 is similar...Jake Schmidt
12/18/2019, 6:51 PMGeorge Coyne
12/18/2019, 10:23 PMdef handler(event, context):
recipient_list = check_state()
service = service_account_login()
completed_samples = []
for r in recipient_list:
# message = create_message(recipient=r, email_from=EMAIL_FROM)
# send_message(service, USER_ID, message)
<http://logger.info|logger.info>(f"Message sent to {r[3]} for sampleid {r[4]}")
completed_samples.append(r[4])
# Write to state table for completed samples
write_state(completed_samples)
Jake Schmidt
12/18/2019, 10:51 PMresult
of a given task in a flow? I see that flow.run().result
is a dict
keyed by `Task`s but flow.run().result[prefect.Task('task_name')]
gives a KeyError
. I can’t find the answer in the documentation or the api reference for State
.Jake Schmidt
12/18/2019, 11:23 PMitay livni
12/19/2019, 6:13 AMArsenii
12/23/2019, 5:00 AMprefect[all-extras]
and dask[complete]
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 5, in <module>
from prefect.cli import cli
File "/usr/local/lib/python3.7/site-packages/prefect/__init__.py", line 8, in <module>
import prefect.triggers
File "/usr/local/lib/python3.7/site-packages/prefect/triggers.py", line 48, in <module>
from prefect.engine import signals, state
File "/usr/local/lib/python3.7/site-packages/prefect/engine/__init__.py", line 3, in <module>
import prefect.engine.executors
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/__init__.py", line 30, in <module>
from prefect.engine.executors.dask import DaskExecutor, LocalDaskExecutor
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 10, in <module>
from distributed import Client, Future, fire_and_forget, worker_client
File "/usr/local/lib/python3.7/site-packages/distributed/__init__.py", line 3, in <module>
from .actor import Actor, ActorFuture
File "/usr/local/lib/python3.7/site-packages/distributed/actor.py", line 7, in <module>
from .client import Future, default_client
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 69, in <module>
from .worker import dumps_task, get_client, get_worker, secede
File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 22, in <module>
from dask.system import CPU_COUNT
ModuleNotFoundError: No module named 'dask.system'
Kamil Okáč
12/23/2019, 12:38 PMAndrew Schechtman-Rook
12/23/2019, 8:13 PMprefect.context.caches
. I've tried a bunch of things to try to do this, including:
• Turning on caching, and specifying LocalResultHandler
as the task/flow's result_handler
. In this case caching works, but it doesn't use the result_handler
to store/retrieve the task results.
• Turning on checkpointing
and specifying LocalResultHandler
as result_handler
. This works to save the result to disk, but that appears to be a one way operation - AFAICT there aren't any hooks in prefect to pull the checkpointed data back in when restoring from cache.
• Using custom `state_handler`s to essentially intercept the results and do the file I/O. I'll admit I've spent the least time on this approach, it seems like it might work but I don't have a good enough grasp on prefect to know either how to implement it or even if it's a good idea.
Anyone had to implement something like this, or have any additional ideas?Alex Goodman
12/23/2019, 9:27 PMLocalResultHandler
does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by flow.run()
, or persist as state is available with a state_handler.
An example of caching this for the whole flow using the LocalResultHandler
+ some state manipulation:
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
x = e()
y = y(x)
z(y)
flow_state = flow.run()
for task, task_state in state.result.items():
task_name = slugify.slugify(task.name)
filepath = task_state._result.safe_value.value
with open(os.path.join(".prefect", task_name), "wb") as fh:
fh.write(filepath)
In this way in the .prefect
directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
tree .prefect
.prefect
├── prefect-2g3sboyi
├── prefect-ixeqv82p
├── prefect-km5ufh9y
├── x
├── y
└── z
Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the task.slug
and use this to key off of.
If you wanted to do the same thing but in a task state_handler:
def handler(task, old, new):
if new.is_successful():
task_name = slugify.slugify(task.name)
filepath = new._result.safe_value.value
with open(os.path.join(".prefect", task_name), "w") as fh:
fh.write(filepath)
Ryan Connolly
12/24/2019, 6:05 PMDaryll Strauss
12/26/2019, 8:16 PMDaryll Strauss
12/26/2019, 8:18 PMJake Schmidt
12/27/2019, 5:42 PMAdam
12/29/2019, 11:21 PMTarget
for outputs, and if that Target exists (you get to define “exists”), the task won’t run and will return the already-completed result. Is there something similar for Prefect, or is this something I would have to implement manually? Cacheing seems like it would work, but does that cache persist across runs, or if different `Flow`s are using the same Task
?
As a concrete example - I’m writing a single row per day into a psql database. The day is defined by a Parameter("date")
. If I run my Flow repeatedly, it keeps inserting rows for the same date. Instead, I’d like to check for existence of the row with that date and return with the appropriate State (probably Success
?).John Ramirez
12/30/2019, 4:47 PMJohn Ramirez
01/02/2020, 5:03 PMresult_handler
with an @task
with no success. Does anyone have a good example to demonstrate how to code itJuarez Rudsatz
01/02/2020, 8:31 PMJohn Ramirez
01/03/2020, 7:17 PMflow.visualize()
Alex Cano
01/03/2020, 7:49 PMJavier Garcia
01/03/2020, 8:16 PMAPI Reference
links in https://docs.prefect.io/core/task_library/google.html#google-cloud-storage are returning 404 -> https://docs.prefect.io/api/unreleased/tasks/google.html#prefect-tasks-google-storage-gcsdownloadalexandre kempf
01/04/2020, 1:33 PMfrom prefect import task, Parameter, Flow
import numpy as np
@task
def augment_data(a, b=10):
return np.sqrt(a)+np.sqrt(b)
with Flow("augment") as flowAug:
a = Parameter("a")
aug = augment_data(a, 10)
@task
def load_data(c):
return c
@task
def train_model(model, d):
print("Training, {} with {}!".format(model, d))
return model
with Flow("training") as flowModel:
init = Parameter("c")
model = Parameter("model")
data = load_data(init)
state = flowAug.run(a=data)
aug_data = state.result[aug].result
result = train_model(model, aug_data)
state_model = flowModel.run(c=5, model="SVM")
Braun Reyes
01/07/2020, 1:14 AMBraun Reyes
01/07/2020, 1:16 AMBraun Reyes
01/07/2020, 1:17 AMBraun Reyes
01/07/2020, 1:18 AMBraun Reyes
01/07/2020, 1:21 AMBraun Reyes
01/07/2020, 1:22 AMBraun Reyes
01/07/2020, 1:23 AMBraun Reyes
01/07/2020, 1:23 AM